From 8f54635006b8caa2770d552209e7ad7fe2475f96 Mon Sep 17 00:00:00 2001 From: sunjincheng121 Date: Tue, 16 May 2017 11:58:37 +0800 Subject: [PATCH 1/2] [FLINK-6583][talbe]Enable QueryConfig in count base GroupWindow --- .../DataStreamGroupWindowAggregate.scala | 36 ++- .../CountTriggerWithCleanupState.scala | 146 +++++++++ .../table/GroupWindowAggregationsITCase.scala | 10 +- ...ntTriggerWithCleanupStateHarnessTest.scala | 305 ++++++++++++++++++ 4 files changed, 489 insertions(+), 8 deletions(-) create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/CountTriggerWithCleanupState.scala create mode 100644 flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index c158579aa3557..228c2b51895b0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -25,7 +25,9 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream} import org.apache.flink.streaming.api.windowing.assigners._ -import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} +import org.apache.flink.streaming.api.windowing.evictors.CountEvictor +import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger +import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, Window => DataStreamWindow} import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException} import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.calcite.FlinkTypeFactory @@ -40,7 +42,8 @@ import org.apache.flink.table.runtime.aggregate.AggregateUtil._ import org.apache.flink.table.runtime.aggregate._ import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval -import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo} +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState +import org.slf4j.LoggerFactory class DataStreamGroupWindowAggregate( window: LogicalWindow, @@ -54,6 +57,8 @@ class DataStreamGroupWindowAggregate( grouping: Array[Int]) extends SingleRel(cluster, traitSet, inputNode) with CommonAggregate with DataStreamRel { + private val LOG = LoggerFactory.getLogger(this.getClass) + override def deriveRowType(): RelDataType = schema.logicalType override def needsUpdatesAsRetraction = true @@ -131,6 +136,19 @@ class DataStreamGroupWindowAggregate( "non-windowed GroupBy aggregation.") } + val isCountWindow = window match { + case TumblingGroupWindow(_, _, size) if isRowCountLiteral(size) => true + case SlidingGroupWindow(_, _, size, _) if isRowCountLiteral(size) => true + case _ => false + } + + if (isCountWindow && grouping.length > 0 && queryConfig.getMinIdleStateRetentionTime < 0) { + LOG.warn( + "No state retention interval configured for a query which accumulates state. " + + "Please provide a query configuration with valid retention interval to prevent excessive " + + "state size. You may specify a retention time of 0 to not clean up the state.") + } + val outRowType = CRowTypeInfo(schema.physicalTypeInfo) val aggString = aggregationToString( @@ -167,7 +185,7 @@ class DataStreamGroupWindowAggregate( val keyedStream = inputDS.keyBy(physicalGrouping: _*) val windowedStream = - createKeyedWindowedStream(window, keyedStream) + createKeyedWindowedStream(queryConfig, window, keyedStream) .asInstanceOf[WindowedStream[CRow, Tuple, DataStreamWindow]] val (aggFunction, accumulatorRowType, aggResultRowType) = @@ -192,7 +210,7 @@ class DataStreamGroupWindowAggregate( physicalNamedProperties) val windowedStream = - createNonKeyedWindowedStream(window, inputDS) + createNonKeyedWindowedStream(queryConfig, window, inputDS) .asInstanceOf[AllWindowedStream[CRow, DataStreamWindow]] val (aggFunction, accumulatorRowType, aggResultRowType) = @@ -215,6 +233,7 @@ class DataStreamGroupWindowAggregate( object DataStreamGroupWindowAggregate { private def createKeyedWindowedStream( + queryConfig: StreamQueryConfig, groupWindow: LogicalWindow, stream: KeyedStream[CRow, Tuple]): WindowedStream[CRow, Tuple, _ <: DataStreamWindow] = groupWindow match { @@ -226,6 +245,8 @@ object DataStreamGroupWindowAggregate { case TumblingGroupWindow(_, timeField, size) if isProctimeAttribute(timeField) && isRowCountLiteral(size)=> stream.countWindow(toLong(size)) + .trigger(PurgingTrigger.of( + CountTriggerWithCleanupState.of[GlobalWindow](queryConfig, toLong(size)))); case TumblingGroupWindow(_, timeField, size) if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size) => @@ -245,6 +266,8 @@ object DataStreamGroupWindowAggregate { case SlidingGroupWindow(_, timeField, size, slide) if isProctimeAttribute(timeField) && isRowCountLiteral(size) => stream.countWindow(toLong(size), toLong(slide)) + .evictor(CountEvictor.of(toLong(size))) + .trigger(CountTriggerWithCleanupState.of[GlobalWindow](queryConfig, toLong(slide))); case SlidingGroupWindow(_, timeField, size, slide) if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size)=> @@ -267,6 +290,7 @@ object DataStreamGroupWindowAggregate { } private def createNonKeyedWindowedStream( + queryConfig: StreamQueryConfig, groupWindow: LogicalWindow, stream: DataStream[CRow]): AllWindowedStream[CRow, _ <: DataStreamWindow] = groupWindow match { @@ -278,6 +302,8 @@ object DataStreamGroupWindowAggregate { case TumblingGroupWindow(_, timeField, size) if isProctimeAttribute(timeField) && isRowCountLiteral(size)=> stream.countWindowAll(toLong(size)) + .trigger(PurgingTrigger.of( + CountTriggerWithCleanupState.of[GlobalWindow](queryConfig, toLong(size)))); case TumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) => stream.windowAll(TumblingEventTimeWindows.of(toTime(size))) @@ -296,6 +322,8 @@ object DataStreamGroupWindowAggregate { case SlidingGroupWindow(_, timeField, size, slide) if isProctimeAttribute(timeField) && isRowCountLiteral(size)=> stream.countWindowAll(toLong(size), toLong(slide)) + .evictor(CountEvictor.of(toLong(size))) + .trigger(CountTriggerWithCleanupState.of[GlobalWindow](queryConfig, toLong(slide))); case SlidingGroupWindow(_, timeField, size, slide) if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size)=> diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/CountTriggerWithCleanupState.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/CountTriggerWithCleanupState.scala new file mode 100644 index 0000000000000..c6789ba1084f8 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/CountTriggerWithCleanupState.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.triggers + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext +import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult} +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState.Sum + +class CountTriggerWithCleanupState[W <: Window](queryConfig: StreamQueryConfig, maxCount: Long) + extends Trigger[Any, W] { + + private val serialVersionUID: Long = 1L + + protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val stateDesc: ReducingStateDescriptor[JLong] = + new ReducingStateDescriptor[JLong]("count", new Sum, Types.LONG) + + private val cleanupStateDesc: ValueStateDescriptor[JLong] = + new ValueStateDescriptor[JLong]("countCleanup", Types.LONG) + + override def canMerge: Boolean = true + + override def onMerge(window: W, ctx: Trigger.OnMergeContext) { + ctx.mergePartitionedState(stateDesc) + } + + override def toString: String = "CountTriggerWithCleanupState(" + + "minIdleStateRetentionTime=" + queryConfig.getMinIdleStateRetentionTime + ", " + + "maxIdleStateRetentionTime=" + queryConfig.getMaxIdleStateRetentionTime + ", " + + "maxCount=" + maxCount + ")" + + override def onElement( + element: Any, + timestamp: Long, + window: W, + ctx: TriggerContext): TriggerResult = { + + val currentTime = ctx.getCurrentProcessingTime + + // register cleanup timer + if (stateCleaningEnabled) { + // last registered timer + val curCleanupTime = ctx.getPartitionedState(cleanupStateDesc).value() + + // check if a cleanup timer is registered and + // that the current cleanup timer won't delete state we need to keep + if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) { + // we need to register a new (later) timer + val cleanupTime = currentTime + maxRetentionTime + // register timer and remember clean-up time + ctx.registerProcessingTimeTimer(cleanupTime) + + if (null != curCleanupTime) { + ctx.deleteProcessingTimeTimer(curCleanupTime) + } + + ctx.getPartitionedState(cleanupStateDesc).update(cleanupTime) + } + } + + val count: ReducingState[JLong] = ctx.getPartitionedState(stateDesc) + count.add(1L) + + if (count.get >= maxCount) { + count.clear() + return TriggerResult.FIRE + } + + return TriggerResult.CONTINUE + } + + override def onProcessingTime( + time: Long, + window: W, + ctx: TriggerContext): TriggerResult = { + + if (stateCleaningEnabled) { + val cleanupTime = ctx.getPartitionedState(cleanupStateDesc).value() + // check that the triggered timer is the last registered processing time timer. + if (null != cleanupTime && time == cleanupTime) { + clear(window, ctx) + return TriggerResult.FIRE_AND_PURGE + } + } + return TriggerResult.CONTINUE + } + + override def onEventTime( + time: Long, + window: W, + ctx: TriggerContext): TriggerResult = { + TriggerResult.CONTINUE + } + + override def clear( + window: W, + ctx: TriggerContext): Unit = { + ctx.getPartitionedState(stateDesc).clear() + ctx.getPartitionedState(cleanupStateDesc).clear() + } + +} + +object CountTriggerWithCleanupState { + + /** + * + * @param maxCount The count of elements at which to fire. + * @tparam W The type of { @link Window Windows} on which this trigger can operate. + */ + def of[W <: Window]( + queryConfig: StreamQueryConfig, + maxCount: Long): CountTriggerWithCleanupState[W] = + new CountTriggerWithCleanupState[W](queryConfig, maxCount) + + class Sum extends ReduceFunction[JLong] { + override def reduce( + value1: JLong, + value2: JLong): JLong = value1 + value2 + } + +} diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala index 846fe3e83991c..81d3577ffb864 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala @@ -18,13 +18,14 @@ package org.apache.flink.table.api.scala.stream.table +import org.apache.flink.api.common.time.Time import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase -import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment} import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge} import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.scala.stream.table.GroupWindowAggregationsITCase.TimestampAndWatermarkWithOffset @@ -41,7 +42,8 @@ import scala.collection.mutable * programs is possible. */ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase { - + private val queryConfig = new StreamQueryConfig() + queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2)) val data = List( (1L, 1, "Hi"), (2L, 2, "Hello"), @@ -68,7 +70,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase { .select('string, countFun('int), 'int.avg, weightAvgFun('long, 'int), weightAvgFun('int, 'int)) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toDataStream[Row](queryConfig) results.addSink(new StreamITCase.StringSink) env.execute() @@ -136,7 +138,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase { .select(countFun('string), 'int.avg, weightAvgFun('long, 'int), weightAvgFun('int, 'int)) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toDataStream[Row](queryConfig) results.addSink(new StreamITCase.StringSink) env.execute() diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala new file mode 100644 index 0000000000000..a6a651af5964d --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.harness + +import com.google.common.collect.Lists +import org.apache.flink.api.common.time.Time +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState +import org.junit.Assert.assertEquals +import org.junit.Test + +class CountTriggerWithCleanupStateHarnessTest { + protected var queryConfig = + new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3)) + + @Test + def testFiringAndFireingWithPurging(): Unit = { + val testHarness = new TriggerTestHarness[Any, TimeWindow]( + CountTriggerWithCleanupState.of[TimeWindow](queryConfig, 10), new TimeWindow.Serializer) + + // try to trigger onProcessingTime method via 1, but there is non timer is triggered + assertEquals(0, testHarness.advanceProcessingTime(1).size()) + + // register cleanup timer with 3001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + + // try to trigger onProcessingTime method via 1000, but there is non timer is triggered + assertEquals(0, testHarness.advanceProcessingTime(1000).size()) + + // 1000 + 2000 <= 3001 reuse timer 3001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + + // there are two state entries, one is timer(3001) another is counter(2) + assertEquals(2, testHarness.numStateEntries) + + // try to trigger onProcessingTime method via 3001, and timer(3001) is triggered + assertEquals( + TriggerResult.FIRE_AND_PURGE, + testHarness.advanceProcessingTime(3001).iterator().next().f1) + + assertEquals(0, testHarness.numStateEntries) + + // 3001 + 2000 >= 3001 register cleanup timer with 6001, and remove timer 3001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + + // try to trigger onProcessingTime method via 4002, but there is non timer is triggered + assertEquals(0, testHarness.advanceProcessingTime(4002).size()) + + // 4002 + 2000 >= 6001 register cleanup timer via 7002, and remove timer 6001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + + // 4002 + 2000 <= 7002 reuse timer 7002 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + + // have one timer 7002 + assertEquals(1, testHarness.numProcessingTimeTimers) + assertEquals(0, testHarness.numEventTimeTimers) + assertEquals(2, testHarness.numStateEntries) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 9))) + assertEquals(0, testHarness.numStateEntries(new TimeWindow(9, 18))) + + // 4002 + 2000 <= 7002 reuse timer 7002 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + + // register cleanup timer via 7002 for window (9, 18) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + + // there are four state entries + assertEquals(4, testHarness.numStateEntries) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 9))) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(9, 18))) + + // the window counter triggered, count >= 10 + assertEquals( + TriggerResult.FIRE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + + // counter of window(9, 18) is cleared + assertEquals(3, testHarness.numStateEntries) + + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + assertEquals( + TriggerResult.FIRE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + + // counter of window(0, 9) is cleared + assertEquals(2, testHarness.numStateEntries) + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 9))) + assertEquals(1, testHarness.numStateEntries(new TimeWindow(9, 18))) + + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(18, 27))) + + assertEquals(4, testHarness.numStateEntries) + + // try to trigger onProcessingTime method via 7002, and all states are cleared + assertEquals( + TriggerResult.FIRE_AND_PURGE, + testHarness.advanceProcessingTime(7002).iterator().next().f1) + + assertEquals(0, testHarness.numStateEntries) + } + + /** + * Verify that clear() does not leak across windows. + */ + @Test + def testClear() { + val testHarness = new TriggerTestHarness[Any, TimeWindow]( + CountTriggerWithCleanupState.of[TimeWindow](queryConfig, 3), + new TimeWindow.Serializer) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 2))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(2, 4))) + // have 2 timers + assertEquals(2, testHarness.numProcessingTimeTimers) + assertEquals(0, testHarness.numEventTimeTimers) + assertEquals(4, testHarness.numStateEntries) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 2))) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(2, 4))) + testHarness.clearTriggerState(new TimeWindow(2, 4)) + assertEquals(2, testHarness.numStateEntries) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 2))) + assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))) + testHarness.clearTriggerState(new TimeWindow(0, 2)) + assertEquals(0, testHarness.numStateEntries) + assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2))) + assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))) + } + + @Test + def testMergingWindows() { + val testHarness = new TriggerTestHarness[Any, TimeWindow]( + CountTriggerWithCleanupState.of[TimeWindow](queryConfig, 3), + new TimeWindow.Serializer) + + // first trigger onProcessingTime method via 1 + assertEquals(0, testHarness.advanceProcessingTime(1).size()) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 2))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(2, 4))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(4, 6))) + // have 3 timers + assertEquals(3, testHarness.numProcessingTimeTimers) + assertEquals(0, testHarness.numEventTimeTimers) + assertEquals(6, testHarness.numStateEntries) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 2))) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(2, 4))) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(4, 6))) + testHarness.mergeWindows( + new TimeWindow(0, 4), + Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4))) + + assertEquals(3, testHarness.numStateEntries) + assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2))) + assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))) + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 4))) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(4, 6))) + assertEquals( + TriggerResult.FIRE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 4))) + assertEquals(3, testHarness.numStateEntries) + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 4))) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(4, 6))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(4, 6))) + assertEquals( + TriggerResult.FIRE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(4, 6))) + assertEquals(2, testHarness.numStateEntries) + + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 2))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(2, 4))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(4, 6))) + + // try to trigger onProcessingTime method via 3001 and FIRE_AND_PURGE + val it = testHarness.advanceProcessingTime(3001).iterator() + while (it.hasNext) { + assertEquals( + TriggerResult.FIRE_AND_PURGE, it.next().f1) + } + + assertEquals(0, testHarness.numStateEntries) + } + + @Test + def testMergeSubsumingWindow() { + val testHarness = new TriggerTestHarness[Any, TimeWindow]( + CountTriggerWithCleanupState.of[TimeWindow](queryConfig, 3), + new TimeWindow.Serializer) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(2, 4))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(4, 6))) + // have 2 timers + assertEquals(2, testHarness.numProcessingTimeTimers) + assertEquals(0, testHarness.numEventTimeTimers) + assertEquals(4, testHarness.numStateEntries) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(2, 4))) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(4, 6))) + testHarness.mergeWindows( + new TimeWindow(0, 8), + Lists.newArrayList(new TimeWindow(2, 4), new TimeWindow(4, 6))) + assertEquals(1, testHarness.numStateEntries) + assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))) + assertEquals(0, testHarness.numStateEntries(new TimeWindow(4, 6))) + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 8))) + assertEquals( + TriggerResult.FIRE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 8))) + assertEquals(1, testHarness.numStateEntries) + + testHarness.clearTriggerState(new TimeWindow(0, 8)) + assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 8))) + } +} From ef114d82af522c73aa5461a3cae5e7db1737b48e Mon Sep 17 00:00:00 2001 From: sunjincheng121 Date: Tue, 16 May 2017 23:30:43 +0800 Subject: [PATCH 2/2] Simplified some code --- .../DataStreamGroupWindowAggregate.scala | 16 +- ....scala => StateCleaningCountTrigger.scala} | 62 ++-- ...ntTriggerWithCleanupStateHarnessTest.scala | 305 ------------------ ...StateCleaningCountTriggerHarnessTest.scala | 147 +++++++++ 4 files changed, 180 insertions(+), 350 deletions(-) rename flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/{CountTriggerWithCleanupState.scala => StateCleaningCountTrigger.scala} (71%) delete mode 100644 flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala create mode 100644 flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index 228c2b51895b0..1ac013a31c7c3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -25,7 +25,6 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream} import org.apache.flink.streaming.api.windowing.assigners._ -import org.apache.flink.streaming.api.windowing.evictors.CountEvictor import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, Window => DataStreamWindow} import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException} @@ -42,7 +41,7 @@ import org.apache.flink.table.runtime.aggregate.AggregateUtil._ import org.apache.flink.table.runtime.aggregate._ import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval -import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState +import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger import org.slf4j.LoggerFactory class DataStreamGroupWindowAggregate( @@ -245,8 +244,7 @@ object DataStreamGroupWindowAggregate { case TumblingGroupWindow(_, timeField, size) if isProctimeAttribute(timeField) && isRowCountLiteral(size)=> stream.countWindow(toLong(size)) - .trigger(PurgingTrigger.of( - CountTriggerWithCleanupState.of[GlobalWindow](queryConfig, toLong(size)))); + .trigger(PurgingTrigger.of(StateCleaningCountTrigger.of(queryConfig, toLong(size)))); case TumblingGroupWindow(_, timeField, size) if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size) => @@ -266,8 +264,7 @@ object DataStreamGroupWindowAggregate { case SlidingGroupWindow(_, timeField, size, slide) if isProctimeAttribute(timeField) && isRowCountLiteral(size) => stream.countWindow(toLong(size), toLong(slide)) - .evictor(CountEvictor.of(toLong(size))) - .trigger(CountTriggerWithCleanupState.of[GlobalWindow](queryConfig, toLong(slide))); + .trigger(StateCleaningCountTrigger.of(queryConfig, toLong(slide))); case SlidingGroupWindow(_, timeField, size, slide) if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size)=> @@ -302,8 +299,7 @@ object DataStreamGroupWindowAggregate { case TumblingGroupWindow(_, timeField, size) if isProctimeAttribute(timeField) && isRowCountLiteral(size)=> stream.countWindowAll(toLong(size)) - .trigger(PurgingTrigger.of( - CountTriggerWithCleanupState.of[GlobalWindow](queryConfig, toLong(size)))); + .trigger(PurgingTrigger.of(StateCleaningCountTrigger.of(queryConfig, toLong(size)))); case TumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) => stream.windowAll(TumblingEventTimeWindows.of(toTime(size))) @@ -322,8 +318,7 @@ object DataStreamGroupWindowAggregate { case SlidingGroupWindow(_, timeField, size, slide) if isProctimeAttribute(timeField) && isRowCountLiteral(size)=> stream.countWindowAll(toLong(size), toLong(slide)) - .evictor(CountEvictor.of(toLong(size))) - .trigger(CountTriggerWithCleanupState.of[GlobalWindow](queryConfig, toLong(slide))); + .trigger(StateCleaningCountTrigger.of(queryConfig, toLong(slide))); case SlidingGroupWindow(_, timeField, size, slide) if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size)=> @@ -345,6 +340,5 @@ object DataStreamGroupWindowAggregate { stream.windowAll(EventTimeSessionWindows.withGap(toTime(gap))) } - } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/CountTriggerWithCleanupState.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala similarity index 71% rename from flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/CountTriggerWithCleanupState.scala rename to flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala index c6789ba1084f8..3439258c50734 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/CountTriggerWithCleanupState.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala @@ -19,36 +19,38 @@ package org.apache.flink.table.runtime.triggers import java.lang.{Long => JLong} +import org.apache.flink.annotation.PublicEvolving import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.state._ import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult} -import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow import org.apache.flink.table.api.{StreamQueryConfig, Types} -import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState.Sum +import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger.Sum -class CountTriggerWithCleanupState[W <: Window](queryConfig: StreamQueryConfig, maxCount: Long) - extends Trigger[Any, W] { +/** + * A {@link Trigger} that fires once the count of elements in a pane reaches the given count + * or the cleanup timer is triggered. + */ +@PublicEvolving +class StateCleaningCountTrigger(queryConfig: StreamQueryConfig, maxCount: Long) + extends Trigger[Any, GlobalWindow] { - private val serialVersionUID: Long = 1L + private val serialVersionUID = 1L protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime protected val stateCleaningEnabled: Boolean = minRetentionTime > 1 - private val stateDesc: ReducingStateDescriptor[JLong] = + private val stateDesc = new ReducingStateDescriptor[JLong]("count", new Sum, Types.LONG) - private val cleanupStateDesc: ValueStateDescriptor[JLong] = + private val cleanupStateDesc = new ValueStateDescriptor[JLong]("countCleanup", Types.LONG) - override def canMerge: Boolean = true + override def canMerge = false - override def onMerge(window: W, ctx: Trigger.OnMergeContext) { - ctx.mergePartitionedState(stateDesc) - } - - override def toString: String = "CountTriggerWithCleanupState(" + + override def toString: String = "CountTriggerGlobalWindowithCleanupState(" + "minIdleStateRetentionTime=" + queryConfig.getMinIdleStateRetentionTime + ", " + "maxIdleStateRetentionTime=" + queryConfig.getMaxIdleStateRetentionTime + ", " + "maxCount=" + maxCount + ")" @@ -56,7 +58,7 @@ class CountTriggerWithCleanupState[W <: Window](queryConfig: StreamQueryConfig, override def onElement( element: Any, timestamp: Long, - window: W, + window: GlobalWindow, ctx: TriggerContext): TriggerResult = { val currentTime = ctx.getCurrentProcessingTime @@ -82,7 +84,7 @@ class CountTriggerWithCleanupState[W <: Window](queryConfig: StreamQueryConfig, } } - val count: ReducingState[JLong] = ctx.getPartitionedState(stateDesc) + val count = ctx.getPartitionedState(stateDesc) count.add(1L) if (count.get >= maxCount) { @@ -90,12 +92,12 @@ class CountTriggerWithCleanupState[W <: Window](queryConfig: StreamQueryConfig, return TriggerResult.FIRE } - return TriggerResult.CONTINUE + TriggerResult.CONTINUE } override def onProcessingTime( time: Long, - window: W, + window: GlobalWindow, ctx: TriggerContext): TriggerResult = { if (stateCleaningEnabled) { @@ -106,41 +108,33 @@ class CountTriggerWithCleanupState[W <: Window](queryConfig: StreamQueryConfig, return TriggerResult.FIRE_AND_PURGE } } - return TriggerResult.CONTINUE + TriggerResult.CONTINUE } - override def onEventTime( - time: Long, - window: W, - ctx: TriggerContext): TriggerResult = { + override def onEventTime(time: Long, window: GlobalWindow, ctx: TriggerContext) = { TriggerResult.CONTINUE } - override def clear( - window: W, - ctx: TriggerContext): Unit = { + override def clear(window: GlobalWindow, ctx: TriggerContext): Unit = { ctx.getPartitionedState(stateDesc).clear() ctx.getPartitionedState(cleanupStateDesc).clear() } } -object CountTriggerWithCleanupState { +object StateCleaningCountTrigger { /** + * Create a [[StateCleaningCountTrigger]] instance. * + * @param queryConfig query configuration. * @param maxCount The count of elements at which to fire. - * @tparam W The type of { @link Window Windows} on which this trigger can operate. */ - def of[W <: Window]( - queryConfig: StreamQueryConfig, - maxCount: Long): CountTriggerWithCleanupState[W] = - new CountTriggerWithCleanupState[W](queryConfig, maxCount) + def of(queryConfig: StreamQueryConfig, maxCount: Long): StateCleaningCountTrigger = + new StateCleaningCountTrigger(queryConfig, maxCount) class Sum extends ReduceFunction[JLong] { - override def reduce( - value1: JLong, - value2: JLong): JLong = value1 + value2 + override def reduce(value1: JLong, value2: JLong): JLong = value1 + value2 } } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala deleted file mode 100644 index a6a651af5964d..0000000000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala +++ /dev/null @@ -1,305 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.runtime.harness - -import com.google.common.collect.Lists -import org.apache.flink.api.common.time.Time -import org.apache.flink.streaming.api.windowing.triggers.TriggerResult -import org.apache.flink.streaming.api.windowing.windows.TimeWindow -import org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord -import org.apache.flink.table.api.StreamQueryConfig -import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState -import org.junit.Assert.assertEquals -import org.junit.Test - -class CountTriggerWithCleanupStateHarnessTest { - protected var queryConfig = - new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3)) - - @Test - def testFiringAndFireingWithPurging(): Unit = { - val testHarness = new TriggerTestHarness[Any, TimeWindow]( - CountTriggerWithCleanupState.of[TimeWindow](queryConfig, 10), new TimeWindow.Serializer) - - // try to trigger onProcessingTime method via 1, but there is non timer is triggered - assertEquals(0, testHarness.advanceProcessingTime(1).size()) - - // register cleanup timer with 3001 - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) - - // try to trigger onProcessingTime method via 1000, but there is non timer is triggered - assertEquals(0, testHarness.advanceProcessingTime(1000).size()) - - // 1000 + 2000 <= 3001 reuse timer 3001 - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) - - // there are two state entries, one is timer(3001) another is counter(2) - assertEquals(2, testHarness.numStateEntries) - - // try to trigger onProcessingTime method via 3001, and timer(3001) is triggered - assertEquals( - TriggerResult.FIRE_AND_PURGE, - testHarness.advanceProcessingTime(3001).iterator().next().f1) - - assertEquals(0, testHarness.numStateEntries) - - // 3001 + 2000 >= 3001 register cleanup timer with 6001, and remove timer 3001 - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) - - // try to trigger onProcessingTime method via 4002, but there is non timer is triggered - assertEquals(0, testHarness.advanceProcessingTime(4002).size()) - - // 4002 + 2000 >= 6001 register cleanup timer via 7002, and remove timer 6001 - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) - - // 4002 + 2000 <= 7002 reuse timer 7002 - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) - - // have one timer 7002 - assertEquals(1, testHarness.numProcessingTimeTimers) - assertEquals(0, testHarness.numEventTimeTimers) - assertEquals(2, testHarness.numStateEntries) - assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 9))) - assertEquals(0, testHarness.numStateEntries(new TimeWindow(9, 18))) - - // 4002 + 2000 <= 7002 reuse timer 7002 - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) - - // register cleanup timer via 7002 for window (9, 18) - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) - - // there are four state entries - assertEquals(4, testHarness.numStateEntries) - assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 9))) - assertEquals(2, testHarness.numStateEntries(new TimeWindow(9, 18))) - - // the window counter triggered, count >= 10 - assertEquals( - TriggerResult.FIRE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) - - // counter of window(9, 18) is cleared - assertEquals(3, testHarness.numStateEntries) - - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) - assertEquals( - TriggerResult.FIRE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) - - // counter of window(0, 9) is cleared - assertEquals(2, testHarness.numStateEntries) - assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 9))) - assertEquals(1, testHarness.numStateEntries(new TimeWindow(9, 18))) - - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(18, 27))) - - assertEquals(4, testHarness.numStateEntries) - - // try to trigger onProcessingTime method via 7002, and all states are cleared - assertEquals( - TriggerResult.FIRE_AND_PURGE, - testHarness.advanceProcessingTime(7002).iterator().next().f1) - - assertEquals(0, testHarness.numStateEntries) - } - - /** - * Verify that clear() does not leak across windows. - */ - @Test - def testClear() { - val testHarness = new TriggerTestHarness[Any, TimeWindow]( - CountTriggerWithCleanupState.of[TimeWindow](queryConfig, 3), - new TimeWindow.Serializer) - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 2))) - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(2, 4))) - // have 2 timers - assertEquals(2, testHarness.numProcessingTimeTimers) - assertEquals(0, testHarness.numEventTimeTimers) - assertEquals(4, testHarness.numStateEntries) - assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 2))) - assertEquals(2, testHarness.numStateEntries(new TimeWindow(2, 4))) - testHarness.clearTriggerState(new TimeWindow(2, 4)) - assertEquals(2, testHarness.numStateEntries) - assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 2))) - assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))) - testHarness.clearTriggerState(new TimeWindow(0, 2)) - assertEquals(0, testHarness.numStateEntries) - assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2))) - assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))) - } - - @Test - def testMergingWindows() { - val testHarness = new TriggerTestHarness[Any, TimeWindow]( - CountTriggerWithCleanupState.of[TimeWindow](queryConfig, 3), - new TimeWindow.Serializer) - - // first trigger onProcessingTime method via 1 - assertEquals(0, testHarness.advanceProcessingTime(1).size()) - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 2))) - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(2, 4))) - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(4, 6))) - // have 3 timers - assertEquals(3, testHarness.numProcessingTimeTimers) - assertEquals(0, testHarness.numEventTimeTimers) - assertEquals(6, testHarness.numStateEntries) - assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 2))) - assertEquals(2, testHarness.numStateEntries(new TimeWindow(2, 4))) - assertEquals(2, testHarness.numStateEntries(new TimeWindow(4, 6))) - testHarness.mergeWindows( - new TimeWindow(0, 4), - Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4))) - - assertEquals(3, testHarness.numStateEntries) - assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2))) - assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))) - assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 4))) - assertEquals(2, testHarness.numStateEntries(new TimeWindow(4, 6))) - assertEquals( - TriggerResult.FIRE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 4))) - assertEquals(3, testHarness.numStateEntries) - assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 4))) - assertEquals(2, testHarness.numStateEntries(new TimeWindow(4, 6))) - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(4, 6))) - assertEquals( - TriggerResult.FIRE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(4, 6))) - assertEquals(2, testHarness.numStateEntries) - - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 2))) - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(2, 4))) - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(4, 6))) - - // try to trigger onProcessingTime method via 3001 and FIRE_AND_PURGE - val it = testHarness.advanceProcessingTime(3001).iterator() - while (it.hasNext) { - assertEquals( - TriggerResult.FIRE_AND_PURGE, it.next().f1) - } - - assertEquals(0, testHarness.numStateEntries) - } - - @Test - def testMergeSubsumingWindow() { - val testHarness = new TriggerTestHarness[Any, TimeWindow]( - CountTriggerWithCleanupState.of[TimeWindow](queryConfig, 3), - new TimeWindow.Serializer) - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(2, 4))) - assertEquals( - TriggerResult.CONTINUE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(4, 6))) - // have 2 timers - assertEquals(2, testHarness.numProcessingTimeTimers) - assertEquals(0, testHarness.numEventTimeTimers) - assertEquals(4, testHarness.numStateEntries) - assertEquals(2, testHarness.numStateEntries(new TimeWindow(2, 4))) - assertEquals(2, testHarness.numStateEntries(new TimeWindow(4, 6))) - testHarness.mergeWindows( - new TimeWindow(0, 8), - Lists.newArrayList(new TimeWindow(2, 4), new TimeWindow(4, 6))) - assertEquals(1, testHarness.numStateEntries) - assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))) - assertEquals(0, testHarness.numStateEntries(new TimeWindow(4, 6))) - assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 8))) - assertEquals( - TriggerResult.FIRE, - testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 8))) - assertEquals(1, testHarness.numStateEntries) - - testHarness.clearTriggerState(new TimeWindow(0, 8)) - assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 8))) - } -} diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala new file mode 100644 index 0000000000000..96601fb711220 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.harness + +import org.apache.flink.api.common.time.Time +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow +import org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger +import org.junit.Assert.assertEquals +import org.junit.Test + +class StateCleaningCountTriggerHarnessTest { + protected var queryConfig = + new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3)) + + @Test + def testFiringAndFiringWithPurging(): Unit = { + val testHarness = new TriggerTestHarness[Any, GlobalWindow]( + StateCleaningCountTrigger.of(queryConfig, 10), new GlobalWindow.Serializer) + + // try to trigger onProcessingTime method via 1, but there is non timer is triggered + assertEquals(0, testHarness.advanceProcessingTime(1).size()) + + // register cleanup timer with 3001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + + // try to trigger onProcessingTime method via 1000, but there is non timer is triggered + assertEquals(0, testHarness.advanceProcessingTime(1000).size()) + + // 1000 + 2000 <= 3001 reuse timer 3001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + + // there are two state entries, one is timer(3001) another is counter(2) + assertEquals(2, testHarness.numStateEntries) + + // try to trigger onProcessingTime method via 3001, and timer(3001) is triggered + assertEquals( + TriggerResult.FIRE_AND_PURGE, + testHarness.advanceProcessingTime(3001).iterator().next().f1) + + assertEquals(0, testHarness.numStateEntries) + + // 3001 + 2000 >= 3001 register cleanup timer with 6001, and remove timer 3001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + + // try to trigger onProcessingTime method via 4002, but there is non timer is triggered + assertEquals(0, testHarness.advanceProcessingTime(4002).size()) + + // 4002 + 2000 >= 6001 register cleanup timer via 7002, and remove timer 6001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + + // 4002 + 2000 <= 7002 reuse timer 7002 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + + // have one timer 7002 + assertEquals(1, testHarness.numProcessingTimeTimers) + assertEquals(0, testHarness.numEventTimeTimers) + assertEquals(2, testHarness.numStateEntries) + assertEquals(2, testHarness.numStateEntries(GlobalWindow.get)) + + // 4002 + 2000 <= 7002 reuse timer 7002 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + assertEquals( + TriggerResult.FIRE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + + // counter of window() is cleared + assertEquals(1, testHarness.numStateEntries) + assertEquals(1, testHarness.numStateEntries(GlobalWindow.get)) + + // try to trigger onProcessingTime method via 7002, and all states are cleared + assertEquals( + TriggerResult.FIRE_AND_PURGE, + testHarness.advanceProcessingTime(7002).iterator().next().f1) + + assertEquals(0, testHarness.numStateEntries) + } + + /** + * Verify that clear() does not leak across windows. + */ + @Test + def testClear() { + val testHarness = new TriggerTestHarness[Any, GlobalWindow]( + StateCleaningCountTrigger.of(queryConfig, 3), + new GlobalWindow.Serializer) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) + // have 1 timers + assertEquals(1, testHarness.numProcessingTimeTimers) + assertEquals(0, testHarness.numEventTimeTimers) + assertEquals(2, testHarness.numStateEntries) + assertEquals(2, testHarness.numStateEntries(GlobalWindow.get)) + + testHarness.clearTriggerState(GlobalWindow.get) + + assertEquals(0, testHarness.numStateEntries) + assertEquals(0, testHarness.numStateEntries(GlobalWindow.get)) + } +}