From ee9a546eeedbfc5f2aea2a76445f46c8b593a2bd Mon Sep 17 00:00:00 2001 From: Marquis Chamberlain Date: Thu, 3 Nov 2022 12:42:47 -0400 Subject: [PATCH] working test for list state --- .../flink/state/ListStateTest.scala | 55 +++++++++++++------ 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/src/test/scala/io/epiphanous/flinkrunner/flink/state/ListStateTest.scala b/src/test/scala/io/epiphanous/flinkrunner/flink/state/ListStateTest.scala index d95b103d..7ebf26e6 100644 --- a/src/test/scala/io/epiphanous/flinkrunner/flink/state/ListStateTest.scala +++ b/src/test/scala/io/epiphanous/flinkrunner/flink/state/ListStateTest.scala @@ -1,33 +1,44 @@ package io.epiphanous.flinkrunner.flink.state +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.streaming.api.functions.sink.SinkFunction +import org.apache.flink.test.util.MiniClusterWithClientResource +import org.scalatest.BeforeAndAfter +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import java.util +import java.util.Collections import org.apache.flink.api.common.state.ListStateDescriptor import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.DataStreamSource -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala.createTypeInformation import org.apache.flink.util.Collector -object ListStateTest { +class ListStateTest extends AnyFlatSpec with Matchers with BeforeAndAfter { + + val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(2) + .setNumberTaskManagers(1) + .build) + + before { + flinkCluster.before() + } + + after { + flinkCluster.after() + } - def main(args: Array[String]): Unit = { - /* - This local job test should yield: + "ListStateTest1 pipeline" should "group correctly" in { - State is empty for element [1]! - State is empty for element [2]! - State is empty for element [3]! - State is empty for element [4]! - State contains 1 - State contains 2 - State contains 3 - State contains 4 - */ + CollectSink.values.clear() - val env = StreamExecutionEnvironment.createLocalEnvironment(1) + val env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment() val dataStream : DataStreamSource[Integer] = env.addSource(new SourceFunction[Integer] { override def run(ctx: SourceFunction.SourceContext[Integer]): Unit = { @@ -63,14 +74,22 @@ object ListStateTest { if(this.listState.contains(value)) { println(s"State contains $value") + out.collect(value + 10) } - listState.add(value) } - }) + }).addSink(new CollectSink()) env.execute("ListStateTestLocalJob") - + CollectSink.values should contain allOf(11, 12, 13, 14) } +} +class CollectSink extends SinkFunction[Integer] { + override def invoke(value: Integer, context: SinkFunction.Context): Unit = { + CollectSink.values.add(value) + } } +object CollectSink { + val values: util.List[Integer] = Collections.synchronizedList(new util.ArrayList()) +} \ No newline at end of file