-
Notifications
You must be signed in to change notification settings - Fork 4
/
ListStateTest.scala
95 lines (78 loc) · 3.13 KB
/
ListStateTest.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package io.epiphanous.flinkrunner.flink.state
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import java.util
import java.util.Collections
import org.apache.flink.api.common.state.ListStateDescriptor
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStreamSource
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.createTypeInformation
import org.apache.flink.util.Collector
class ListStateTest extends AnyFlatSpec with Matchers with BeforeAndAfter {
val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build)
before {
flinkCluster.before()
}
after {
flinkCluster.after()
}
"ListStateTest1 pipeline" should "group correctly" in {
CollectSink.values.clear()
val env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment()
val dataStream : DataStreamSource[Integer] = env.addSource(new SourceFunction[Integer] {
override def run(ctx: SourceFunction.SourceContext[Integer]): Unit = {
for( x <- 1 until 3) {
val data = List.fill(1)(List(1, 2, 3, 4)).flatten
for (x <- data) {
ctx.collect(new Integer(x))
}
}
}
override def cancel(): Unit = ???
})
dataStream.keyBy(new KeySelector[Integer, Integer] {
override def getKey(value: Integer): Integer = {
value
}
}).process(new ProcessFunction[Integer, Integer] {
val listState = new ListState[Integer]()
override def open(parameters: Configuration): Unit = {
this.listState(getRuntimeContext.getListState(new ListStateDescriptor[Integer](
"foo",
createTypeInformation[Integer]
)))
}
override def processElement(value: Integer, ctx: ProcessFunction[Integer, Integer]#Context, out: Collector[Integer]): Unit = {
if(this.listState.isEmpty) {
println(s"State is empty for element [$value]!")
}
if(this.listState.contains(value)) {
println(s"State contains $value")
out.collect(value + 10)
}
listState.add(value)
}
}).addSink(new CollectSink())
env.execute("ListStateTestLocalJob")
CollectSink.values should contain allOf(11, 12, 13, 14)
}
}
class CollectSink extends SinkFunction[Integer] {
override def invoke(value: Integer, context: SinkFunction.Context): Unit = {
CollectSink.values.add(value)
}
}
object CollectSink {
val values: util.List[Integer] = Collections.synchronizedList(new util.ArrayList())
}