Skip to content

Commit

Permalink
list state wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
mchamberlain-mdsol committed Nov 3, 2022
1 parent 96b06a0 commit 4294c8b
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package io.epiphanous.flinkrunner.flink.state


class ListState[T] {
class ListState[T] extends Serializable {

var state : org.apache.flink.api.common.state.ListState[T] = _
private var state : org.apache.flink.api.common.state.ListState[T] = _

def apply(value: org.apache.flink.api.common.state.ListState[T]): Unit = {
this.state = value
}

def isEmpty: Boolean = this.state.get().iterator().hasNext
def isEmpty: Boolean = !this.state.get().iterator().hasNext

def contains(element : T) : Boolean = {
if(this.isEmpty) {
return false
}
this.state.get().forEach(
item => {
if(item.equals(element)) {
Expand All @@ -21,4 +24,8 @@ class ListState[T] {
)
false
}

def update(elements : java.util.List[T]) = this.state.update(elements)
def add(element : T): Unit = this.state.add(element)
def addAll(elements : java.util.List[T]) = this.state.addAll(elements)
}
Original file line number Diff line number Diff line change
@@ -1,57 +1,75 @@
package io.epiphanous.flinkrunner.flink.state

import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.common.state.ListStateDescriptor
import org.apache.flink.api.common.typeinfo.TypeInfo
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStreamSource
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.{createTypeInformation}
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.createTypeInformation
import org.apache.flink.util.Collector

object ListStateTest {

def main(args: Array[String]): Unit = {

val dummySourceSeq = 1 to 10 by 1
/*
This local job test should yield:
State is empty for element [1]!
State is empty for element [2]!
State is empty for element [3]!
State is empty for element [4]!
State contains 1
State contains 2
State contains 3
State contains 4
*/

val env = StreamExecutionEnvironment.createLocalEnvironment(1)
val dataStream : DataStreamSource[List[Int]] = env.fromElements(List.fill(10000)(dummySourceSeq.toList))

val d = dataStream.flatMap(new FlatMapFunction[List[Int], Int] {
override def flatMap(value: List[Int], out: Collector[Int]): Unit = {
value.foreach(x => out.collect(x))
val dataStream : DataStreamSource[Integer] = env.addSource(new SourceFunction[Integer] {
override def run(ctx: SourceFunction.SourceContext[Integer]): Unit = {
for( x <- 1 until 3) {
val data = List.fill(1)(List(1, 2, 3, 4)).flatten
for (x <- data) {
ctx.collect(new Integer(x))
}
}
}
}).keyBy(new KeySelector[Int, Int] {
override def getKey(value: Int): Int = {

override def cancel(): Unit = ???
})

dataStream.keyBy(new KeySelector[Integer, Integer] {
override def getKey(value: Integer): Integer = {
value
}
}).process(new ProcessFunction[Int, Int] {
}).process(new ProcessFunction[Integer, Integer] {

val listState = new ListState[Int]()
val listState = new ListState[Integer]()
override def open(parameters: Configuration): Unit = {
val d = new ListStateDescriptor[Int](
this.listState(getRuntimeContext.getListState(new ListStateDescriptor[Integer](
"foo",
createTypeInformation[Int]
)

this.listState(getRuntimeContext.getListState(d))
createTypeInformation[Integer]
)))
}
override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit = {
override def processElement(value: Integer, ctx: ProcessFunction[Integer, Integer]#Context, out: Collector[Integer]): Unit = {

if(this.listState.isEmpty) {
println("State is empty!")
listState.state.add(value)
println(s"State is empty for element [$value]!")
}

if(this.listState.contains(value)) {
println("State contains")
println(s"State contains $value")
}

listState.add(value)
}
})


env.execute("ListStateTestLocalJob")

}

Expand Down

0 comments on commit 4294c8b

Please sign in to comment.