Skip to content

Commit

Permalink
working test for list state
Browse files Browse the repository at this point in the history
  • Loading branch information
mchamberlain-mdsol committed Nov 3, 2022
1 parent 4294c8b commit ee9a546
Showing 1 changed file with 37 additions and 18 deletions.
@@ -1,33 +1,44 @@
package io.epiphanous.flinkrunner.flink.state

import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import java.util
import java.util.Collections
import org.apache.flink.api.common.state.ListStateDescriptor
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStreamSource
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.createTypeInformation
import org.apache.flink.util.Collector

object ListStateTest {
class ListStateTest extends AnyFlatSpec with Matchers with BeforeAndAfter {

val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build)

before {
flinkCluster.before()
}

after {
flinkCluster.after()
}

def main(args: Array[String]): Unit = {

/*
This local job test should yield:
"ListStateTest1 pipeline" should "group correctly" in {

State is empty for element [1]!
State is empty for element [2]!
State is empty for element [3]!
State is empty for element [4]!
State contains 1
State contains 2
State contains 3
State contains 4
*/
CollectSink.values.clear()

val env = StreamExecutionEnvironment.createLocalEnvironment(1)
val env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment()

val dataStream : DataStreamSource[Integer] = env.addSource(new SourceFunction[Integer] {
override def run(ctx: SourceFunction.SourceContext[Integer]): Unit = {
Expand Down Expand Up @@ -63,14 +74,22 @@ object ListStateTest {

if(this.listState.contains(value)) {
println(s"State contains $value")
out.collect(value + 10)
}

listState.add(value)
}
})
}).addSink(new CollectSink())

env.execute("ListStateTestLocalJob")

CollectSink.values should contain allOf(11, 12, 13, 14)
}
}

class CollectSink extends SinkFunction[Integer] {
override def invoke(value: Integer, context: SinkFunction.Context): Unit = {
CollectSink.values.add(value)
}
}
object CollectSink {
val values: util.List[Integer] = Collections.synchronizedList(new util.ArrayList())
}

0 comments on commit ee9a546

Please sign in to comment.