From bfa6f38606d4002202aeb702ad06852b0da9d0d8 Mon Sep 17 00:00:00 2001 From: Fabian Hueske Date: Tue, 7 Mar 2017 22:17:54 +0100 Subject: [PATCH] [FLINK-5983] [table] Convert FOR into WHILE loops for aggregation functions. --- .../aggregate/AggregateAggFunction.scala | 29 +++++++---- .../aggregate/AggregateMapFunction.scala | 15 ++++-- .../AggregateReduceCombineFunction.scala | 16 ++++-- .../AggregateReduceGroupFunction.scala | 52 +++++++++++-------- ...etSessionWindowAggregatePreProcessor.scala | 27 +++++++--- ...onWindowAggregateReduceGroupFunction.scala | 36 ++++++++----- ...bleCountWindowAggReduceGroupFunction.scala | 32 ++++++++---- ...leTimeWindowAggReduceCombineFunction.scala | 16 ++++-- ...mbleTimeWindowAggReduceGroupFunction.scala | 34 +++++++----- .../DataSetWindowAggregateMapFunction.scala | 23 ++++---- ...ncrementalAggregateAllWindowFunction.scala | 2 +- .../IncrementalAggregateWindowFunction.scala | 8 ++- ...boundedProcessingOverProcessFunction.scala | 6 ++- 13 files changed, 192 insertions(+), 104 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala index 4d1579b0fd514..11d55e5ba0273 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala @@ -36,43 +36,50 @@ class AggregateAggFunction( private val aggFields: Array[Int]) extends DataStreamAggFunc[Row, Row, Row] { - val aggsWithIdx: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex - override def createAccumulator(): Row = { val accumulatorRow: Row = new Row(aggregates.length) - aggsWithIdx.foreach { case (agg, i) => - accumulatorRow.setField(i, agg.createAccumulator()) + var i = 0 + while (i < aggregates.length) { + accumulatorRow.setField(i, aggregates(i).createAccumulator()) + i += 1 } accumulatorRow } - override def add(value: Row, accumulatorRow: Row) = { + override def add(value: Row, accumulatorRow: Row): Unit = { - aggsWithIdx.foreach { case (agg, i) => + var i = 0 + while (i < aggregates.length) { val acc = accumulatorRow.getField(i).asInstanceOf[Accumulator] val v = value.getField(aggFields(i)) - agg.accumulate(acc, v) + aggregates(i).accumulate(acc, v) + i += 1 } } override def getResult(accumulatorRow: Row): Row = { val output = new Row(aggFields.length) - aggsWithIdx.foreach { case (agg, i) => - output.setField(i, agg.getValue(accumulatorRow.getField(i).asInstanceOf[Accumulator])) + var i = 0 + while (i < aggregates.length) { + val acc = accumulatorRow.getField(i).asInstanceOf[Accumulator] + output.setField(i, aggregates(i).getValue(acc)) + i += 1 } output } override def merge(aAccumulatorRow: Row, bAccumulatorRow: Row): Row = { - aggsWithIdx.foreach { case (agg, i) => + var i = 0 + while (i < aggregates.length) { val aAcc = aAccumulatorRow.getField(i).asInstanceOf[Accumulator] val bAcc = bAccumulatorRow.getField(i).asInstanceOf[Accumulator] val accumulators: JList[Accumulator] = new JArrayList[Accumulator]() accumulators.add(aAcc) accumulators.add(bAcc) - aAccumulatorRow.setField(i, agg.merge(accumulators)) + aAccumulatorRow.setField(i, aggregates(i).merge(accumulators)) + i += 1 } aAccumulatorRow } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala index d936fbb97be26..cdc833c897ed1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala @@ -32,12 +32,13 @@ class AggregateMapFunction[IN, OUT]( @transient private val returnType: TypeInformation[OUT]) extends RichMapFunction[IN, OUT] with ResultTypeQueryable[OUT] { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + private var output: Row = _ override def open(config: Configuration) { - Preconditions.checkNotNull(aggregates) - Preconditions.checkNotNull(aggFields) - Preconditions.checkArgument(aggregates.length == aggFields.length) val partialRowLength = groupingKeys.length + aggregates.length output = new Row(partialRowLength) } @@ -45,15 +46,19 @@ class AggregateMapFunction[IN, OUT]( override def map(value: IN): OUT = { val input = value.asInstanceOf[Row] - for (i <- aggregates.indices) { + var i = 0 + while (i < aggregates.length) { val agg = aggregates(i) val accumulator = agg.createAccumulator() agg.accumulate(accumulator, input.getField(aggFields(i))) output.setField(groupingKeys.length + i, accumulator) + i += 1 } - for (i <- groupingKeys.indices) { + i = 0 + while (i < groupingKeys.length) { output.setField(i, input.getField(groupingKeys(i))) + i += 1 } output.asInstanceOf[OUT] } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala index 6b95cb8e72718..33ededa05c4b8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala @@ -73,15 +73,18 @@ class AggregateReduceCombineFunction( val iterator = records.iterator() // reset first accumulator in merge list - for (i <- aggregates.indices) { + var i = 0 + while (i < aggregates.length) { val accumulator = aggregates(i).createAccumulator() accumulatorList(i).set(0, accumulator) + i += 1 } while (iterator.hasNext) { val record = iterator.next() - for (i <- aggregates.indices) { + i = 0 + while (i < aggregates.length) { // insert received accumulator into acc list val newAcc = record.getField(groupKeysMapping.length + i).asInstanceOf[Accumulator] accumulatorList(i).set(1, newAcc) @@ -89,19 +92,24 @@ class AggregateReduceCombineFunction( val retAcc = aggregates(i).merge(accumulatorList(i)) // insert result into acc list accumulatorList(i).set(0, retAcc) + i += 1 } last = record } // set the partial merged result to the aggregateBuffer - for (i <- aggregates.indices) { + i = 0 + while (i < aggregates.length) { preAggOutput.setField(groupKeysMapping.length + i, accumulatorList(i).get(0)) + i += 1 } // set group keys to aggregateBuffer. - for (i <- groupKeysMapping.indices) { + i = 0 + while (i < groupKeysMapping.length) { preAggOutput.setField(i, last.getField(i)) + i += 1 } preAggOutput diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala index 2f75cd76c9ae0..337f1dd643995 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala @@ -48,26 +48,27 @@ class AggregateReduceGroupFunction( private val finalRowArity: Int) extends RichGroupReduceFunction[Row, Row] { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + private var output: Row = _ - private var intermediateGroupKeys: Option[Array[Int]] = None + private val intermediateGroupKeys: Option[Array[Int]] = + if (groupingSetsMapping.nonEmpty) { Some(groupKeysMapping.map(_._1)) } else { None } val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) { new JArrayList[Accumulator](2) } override def open(config: Configuration) { - Preconditions.checkNotNull(aggregates) - Preconditions.checkNotNull(groupKeysMapping) output = new Row(finalRowArity) - if (!groupingSetsMapping.isEmpty) { - intermediateGroupKeys = Some(groupKeysMapping.map(_._1)) - } // init lists with two empty accumulators - for (i <- aggregates.indices) { + var i = 0 + while (i < aggregates.length) { val accumulator = aggregates(i).createAccumulator() accumulatorList(i).add(accumulator) accumulatorList(i).add(accumulator) + i += 1 } } @@ -86,15 +87,18 @@ class AggregateReduceGroupFunction( val iterator = records.iterator() // reset first accumulator in merge list - for (i <- aggregates.indices) { + var i = 0 + while (i < aggregates.length) { val accumulator = aggregates(i).createAccumulator() accumulatorList(i).set(0, accumulator) + i += 1 } while (iterator.hasNext) { val record = iterator.next() - for (i <- aggregates.indices) { + i = 0 + while (i < aggregates.length) { // insert received accumulator into acc list val newAcc = record.getField(groupKeysMapping.length + i).asInstanceOf[Accumulator] accumulatorList(i).set(1, newAcc) @@ -102,31 +106,37 @@ class AggregateReduceGroupFunction( val retAcc = aggregates(i).merge(accumulatorList(i)) // insert result into acc list accumulatorList(i).set(0, retAcc) + i += 1 } last = record } // Set group keys value to final output. - groupKeysMapping.foreach { - case (after, previous) => - output.setField(after, last.getField(previous)) + i = 0 + while (i < groupKeysMapping.length) { + val (after, previous) = groupKeysMapping(i) + output.setField(after, last.getField(previous)) + i += 1 } // get final aggregate value and set to output. - aggregateMapping.foreach { - case (after, previous) => { - val agg = aggregates(previous) - val result = agg.getValue(accumulatorList(previous).get(0)) - output.setField(after, result) - } + i = 0 + while (i < aggregateMapping.length) { + val (after, previous) = aggregateMapping(i) + val agg = aggregates(previous) + val result = agg.getValue(accumulatorList(previous).get(0)) + output.setField(after, result) + i += 1 } // Evaluate additional values of grouping sets if (intermediateGroupKeys.isDefined) { - groupingSetsMapping.foreach { - case (inputIndex, outputIndex) => - output.setField(outputIndex, !intermediateGroupKeys.get.contains(inputIndex)) + i = 0 + while (i < groupingSetsMapping.length) { + val (inputIndex, outputIndex) = groupingSetsMapping(i) + output.setField(outputIndex, !intermediateGroupKeys.get.contains(inputIndex)) + i += 1 } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala index a299c40e91da7..b99c83e40ae83 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.runtime.aggregate import java.lang.Iterable import java.util.{ArrayList => JArrayList} -import org.apache.flink.api.common.functions.{AbstractRichFunction, GroupCombineFunction, MapPartitionFunction, RichGroupCombineFunction} +import org.apache.flink.api.common.functions.{AbstractRichFunction, GroupCombineFunction, MapPartitionFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.types.Row @@ -47,6 +47,9 @@ class DataSetSessionWindowAggregatePreProcessor( with GroupCombineFunction[Row,Row] with ResultTypeQueryable[Row] { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupingKeys) + private var aggregateBuffer: Row = _ private val accumStartPos: Int = groupingKeys.length private val rowTimeFieldPos = accumStartPos + aggregates.length @@ -56,8 +59,6 @@ class DataSetSessionWindowAggregatePreProcessor( } override def open(config: Configuration) { - Preconditions.checkNotNull(aggregates) - Preconditions.checkNotNull(groupingKeys) aggregateBuffer = new Row(rowTimeFieldPos + 2) // init lists with two empty accumulators @@ -110,9 +111,11 @@ class DataSetSessionWindowAggregatePreProcessor( var currentRowTime: java.lang.Long = null // reset first accumulator in merge list - for (i <- aggregates.indices) { + var i = 0 + while (i < aggregates.length) { val accumulator = aggregates(i).createAccumulator() accumulatorList(i).set(0, accumulator) + i += 1 } val iterator = records.iterator() @@ -129,21 +132,26 @@ class DataSetSessionWindowAggregatePreProcessor( doCollect(out, accumulatorList, windowStart, windowEnd) // reset first value of accumulator list - for (i <- aggregates.indices) { + i = 0 + while (i < aggregates.length) { val accumulator = aggregates(i).createAccumulator() accumulatorList(i).set(0, accumulator) + i += 1 } } else { // set group keys to aggregateBuffer. - for (i <- groupingKeys.indices) { + i = 0 + while (i < groupingKeys.length) { aggregateBuffer.setField(i, record.getField(i)) + i += 1 } } windowStart = record.getField(rowTimeFieldPos).asInstanceOf[Long] } - for (i <- aggregates.indices) { + i = 0 + while (i < aggregates.length) { // insert received accumulator into acc list val newAcc = record.getField(accumStartPos + i).asInstanceOf[Accumulator] accumulatorList(i).set(1, newAcc) @@ -151,6 +159,7 @@ class DataSetSessionWindowAggregatePreProcessor( val retAcc = aggregates(i).merge(accumulatorList(i)) // insert result into acc list accumulatorList(i).set(0, retAcc) + i += 1 } // the current rowtime is the last rowtime of the next calculation. @@ -178,8 +187,10 @@ class DataSetSessionWindowAggregatePreProcessor( windowEnd: Long): Unit = { // merge the accumulators into one accumulator - for (i <- aggregates.indices) { + var i = 0 + while (i < aggregates.length) { aggregateBuffer.setField(accumStartPos + i, accumulatorList(i).get(0)) + i += 1 } // intermediate Row WindowStartPos is rowtime pos. diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala index ebef211efb362..68803b4b79383 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala @@ -61,6 +61,9 @@ class DataSetSessionWindowAggregateReduceGroupFunction( isInputCombined: Boolean) extends RichGroupReduceFunction[Row, Row] { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + private var aggregateBuffer: Row = _ private var output: Row = _ private var collector: TimeWindowPropertyCollector = _ @@ -74,8 +77,6 @@ class DataSetSessionWindowAggregateReduceGroupFunction( } override def open(config: Configuration) { - Preconditions.checkNotNull(aggregates) - Preconditions.checkNotNull(groupKeysMapping) aggregateBuffer = new Row(intermediateRowArity) output = new Row(finalRowArity) collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) @@ -104,10 +105,13 @@ class DataSetSessionWindowAggregateReduceGroupFunction( var windowEnd: java.lang.Long = null var currentRowTime: java.lang.Long = null + // reset first accumulator in merge list - for (i <- aggregates.indices) { + var i = 0 + while (i < aggregates.length) { val accumulator = aggregates(i).createAccumulator() accumulatorList(i).set(0, accumulator) + i += 1 } val iterator = records.iterator() @@ -125,22 +129,27 @@ class DataSetSessionWindowAggregateReduceGroupFunction( doEvaluateAndCollect(out, accumulatorList, windowStart, windowEnd) // reset first accumulator in list - for (i <- aggregates.indices) { + i = 0 + while (i < aggregates.length) { val accumulator = aggregates(i).createAccumulator() accumulatorList(i).set(0, accumulator) + i += 1 } } else { // set group keys value to final output. - groupKeysMapping.foreach { - case (after, previous) => - output.setField(after, record.getField(previous)) + i = 0 + while (i < groupKeysMapping.length) { + val (after, previous) = groupKeysMapping(i) + output.setField(after, record.getField(previous)) + i += 1 } } windowStart = record.getField(intermediateRowWindowStartPos).asInstanceOf[Long] } - for (i <- aggregates.indices) { + i = 0 + while (i < aggregates.length) { // insert received accumulator into acc list val newAcc = record.getField(accumStartPos + i).asInstanceOf[Accumulator] accumulatorList(i).set(1, newAcc) @@ -148,6 +157,7 @@ class DataSetSessionWindowAggregateReduceGroupFunction( val retAcc = aggregates(i).merge(accumulatorList(i)) // insert result into acc list accumulatorList(i).set(0, retAcc) + i += 1 } windowEnd = if (isInputCombined) { @@ -180,10 +190,12 @@ class DataSetSessionWindowAggregateReduceGroupFunction( windowEnd: Long): Unit = { // merge the accumulators and then get value for the final output - aggregateMapping.foreach { - case (after, previous) => - val agg = aggregates(previous) - output.setField(after, agg.getValue(accumulatorList(previous).get(0))) + var i = 0 + while (i < aggregateMapping.length) { + val (after, previous) = aggregateMapping(i) + val agg = aggregates(previous) + output.setField(after, agg.getValue(accumulatorList(previous).get(0))) + i += 1 } // adds TimeWindow properties to output then emit output diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala index 85df1d8865de6..23b5c05605ee8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala @@ -47,6 +47,9 @@ class DataSetTumbleCountWindowAggReduceGroupFunction( private val finalRowArity: Int) extends RichGroupReduceFunction[Row, Row] { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + private var aggregateBuffer: Row = _ private var output: Row = _ private val accumStartPos: Int = groupKeysMapping.length @@ -57,8 +60,6 @@ class DataSetTumbleCountWindowAggReduceGroupFunction( } override def open(config: Configuration) { - Preconditions.checkNotNull(aggregates) - Preconditions.checkNotNull(groupKeysMapping) aggregateBuffer = new Row(intermediateRowArity) output = new Row(finalRowArity) @@ -74,21 +75,25 @@ class DataSetTumbleCountWindowAggReduceGroupFunction( var count: Long = 0 val iterator = records.iterator() + var i = 0 while (iterator.hasNext) { if (count == 0) { // reset first accumulator - for (i <- aggregates.indices) { + i = 0 + while (i < aggregates.length) { val accumulator = aggregates(i).createAccumulator() accumulatorList(i).set(0, accumulator) + i += 1 } } val record = iterator.next() count += 1 - for (i <- aggregates.indices) { + i = 0 + while (i < aggregates.length) { // insert received accumulator into acc list val newAcc = record.getField(accumStartPos + i).asInstanceOf[Accumulator] accumulatorList(i).set(1, newAcc) @@ -96,20 +101,25 @@ class DataSetTumbleCountWindowAggReduceGroupFunction( val retAcc = aggregates(i).merge(accumulatorList(i)) // insert result into acc list accumulatorList(i).set(0, retAcc) + i += 1 } if (windowSize == count) { // set group keys value to final output. - groupKeysMapping.foreach { - case (after, previous) => - output.setField(after, record.getField(previous)) + i = 0 + while (i < groupKeysMapping.length) { + val (after, previous) = groupKeysMapping(i) + output.setField(after, record.getField(previous)) + i += 1 } // merge the accumulators and then get value for the final output - aggregateMapping.foreach { - case (after, previous) => - val agg = aggregates(previous) - output.setField(after, agg.getValue(accumulatorList(previous).get(0))) + i = 0 + while (i < aggregateMapping.length) { + val (after, previous) = aggregateMapping(i) + val agg = aggregates(previous) + output.setField(after, agg.getValue(accumulatorList(previous).get(0))) + i += 1 } // emit the output diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala index df8bed9ac91dd..c618325ec1f22 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala @@ -70,15 +70,18 @@ class DataSetTumbleTimeWindowAggReduceCombineFunction( val iterator = records.iterator() // reset first accumulator in merge list - for (i <- aggregates.indices) { + var i = 0 + while (i < aggregates.length) { val accumulator = aggregates(i).createAccumulator() accumulatorList(i).set(0, accumulator) + i += 1 } while (iterator.hasNext) { val record = iterator.next() - for (i <- aggregates.indices) { + i = 0 + while (i < aggregates.length) { // insert received accumulator into acc list val newAcc = record.getField(groupKeysMapping.length + i).asInstanceOf[Accumulator] accumulatorList(i).set(1, newAcc) @@ -86,19 +89,24 @@ class DataSetTumbleTimeWindowAggReduceCombineFunction( val retAcc = aggregates(i).merge(accumulatorList(i)) // insert result into acc list accumulatorList(i).set(0, retAcc) + i += 1 } last = record } // set the partial merged result to the aggregateBuffer - for (i <- aggregates.indices) { + i = 0 + while (i < aggregates.length) { aggregateBuffer.setField(groupKeysMapping.length + i, accumulatorList(i).get(0)) + i += 1 } // set group keys to aggregateBuffer. - for (i <- groupKeysMapping.indices) { + i = 0 + while (i < groupKeysMapping.length) { aggregateBuffer.setField(i, last.getField(i)) + i += 1 } // set the rowtime attribute diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala index 7ce0bf149e703..f85f2cd305e28 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala @@ -51,6 +51,9 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction( finalRowArity: Int) extends RichGroupReduceFunction[Row, Row] { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + private var collector: TimeWindowPropertyCollector = _ protected var aggregateBuffer: Row = _ private var output: Row = _ @@ -64,8 +67,6 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction( } override def open(config: Configuration) { - Preconditions.checkNotNull(aggregates) - Preconditions.checkNotNull(groupKeysMapping) aggregateBuffer = new Row(intermediateRowArity) output = new Row(finalRowArity) collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos) @@ -84,15 +85,18 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction( val iterator = records.iterator() // reset first accumulator in merge list - for (i <- aggregates.indices) { + var i = 0 + while (i < aggregates.length) { val accumulator = aggregates(i).createAccumulator() accumulatorList(i).set(0, accumulator) + i += 1 } while (iterator.hasNext) { val record = iterator.next() - for (i <- aggregates.indices) { + i = 0 + while (i < aggregates.length) { // insert received accumulator into acc list val newAcc = record.getField(groupKeysMapping.length + i).asInstanceOf[Accumulator] accumulatorList(i).set(1, newAcc) @@ -100,24 +104,28 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction( val retAcc = aggregates(i).merge(accumulatorList(i)) // insert result into acc list accumulatorList(i).set(0, retAcc) + i += 1 } last = record } // set group keys value to final output. - groupKeysMapping.foreach { - case (after, previous) => - output.setField(after, last.getField(previous)) + i = 0 + while (i < groupKeysMapping.length) { + val (after, previous) = groupKeysMapping(i) + output.setField(after, last.getField(previous)) + i += 1 } // get final aggregate value and set to output. - aggregateMapping.foreach { - case (after, previous) => { - val agg = aggregates(previous) - val result = agg.getValue(accumulatorList(previous).get(0)) - output.setField(after, result) - } + i = 0 + while (i < aggregateMapping.length) { + val (after, previous) = aggregateMapping(i) + val agg = aggregates(previous) + val result = agg.getValue(accumulatorList(previous).get(0)) + output.setField(after, result) + i += 1 } // get window start timestamp diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggregateMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggregateMapFunction.scala index 68088fc298fdb..0be4881861d92 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggregateMapFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggregateMapFunction.scala @@ -43,33 +43,36 @@ class DataSetWindowAggregateMapFunction( @transient private val returnType: TypeInformation[Row]) extends RichMapFunction[Row, Row] with ResultTypeQueryable[Row] { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + private var output: Row = _ + // add one more arity to store rowtime + private val partialRowLength = groupingKeys.length + aggregates.length + 1 // rowtime index in the buffer output row - private var rowtimeIndex: Int = _ + private val rowtimeIndex: Int = partialRowLength - 1 override def open(config: Configuration) { - Preconditions.checkNotNull(aggregates) - Preconditions.checkNotNull(aggFields) - Preconditions.checkArgument(aggregates.length == aggFields.length) - // add one more arity to store rowtime - val partialRowLength = groupingKeys.length + aggregates.length + 1 - // set rowtime to the last field of the output row - rowtimeIndex = partialRowLength - 1 output = new Row(partialRowLength) } override def map(input: Row): Row = { - for (i <- aggregates.indices) { + var i = 0 + while (i < aggregates.length) { val agg = aggregates(i) val fieldValue = input.getField(aggFields(i)) val accumulator = agg.createAccumulator() agg.accumulate(accumulator, fieldValue) output.setField(groupingKeys.length + i, accumulator) + i += 1 } - for (i <- groupingKeys.indices) { + i = 0 + while (i < groupingKeys.length) { output.setField(i, input.getField(groupingKeys(i))) + i += 1 } val timeField = input.getField(timeFieldPos) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala index 00aba1fcf552c..5992d814a8020 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala @@ -23,7 +23,7 @@ import org.apache.flink.types.Row import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction import org.apache.flink.streaming.api.windowing.windows.Window -import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.util.Collector /** * Computes the final aggregate value from incrementally computed aggreagtes. diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala index a4d4837dac264..983efb3bdad06 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala @@ -60,11 +60,15 @@ class IncrementalAggregateWindowFunction[W <: Window]( if (iterator.hasNext) { val record = iterator.next() - for (i <- 0 until numGroupingKey) { + var i = 0 + while (i < numGroupingKey) { output.setField(i, key.getField(i)) + i += 1 } - for (i <- 0 until numAggregates) { + i = 0 + while (i < numAggregates) { output.setField(numGroupingKey + i, record.getField(i)) + i += 1 } out.collect(output) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala index 058b4a7eee093..41f8e8cd20d55 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala @@ -52,18 +52,20 @@ class UnboundedProcessingOverProcessFunction( ctx: ProcessFunction[Row, Row]#Context, out: Collector[Row]): Unit = { + var i = 0 + var accumulators = state.value() if (null == accumulators) { accumulators = new Row(aggregates.length) - var i = 0 + i = 0 while (i < aggregates.length) { accumulators.setField(i, aggregates(i).createAccumulator()) i += 1 } } - var i = 0 + i = 0 while (i < forwardedFieldCount) { output.setField(i, input.getField(i)) i += 1