Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
mchamberlain-mdsol committed Nov 11, 2022
1 parent ee9a546 commit bc066cc
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 35 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.epiphanous.flinkrunner.flink.state
import scala.collection.JavaConverters._

object RichStateUtils {
implicit class RichListState[T](listState: org.apache.flink.api.common.state.ListState[T]) {
def _iterator: Iterator[T] = listState.get().iterator().asScala
def isEmpty: Boolean = _iterator.isEmpty
def contains(element: T): Boolean = _iterator.contains(element)
def find(element: T) : T = _iterator.find( v => v.equals(element)).get
def length: Int = _iterator.length
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ import org.apache.flink.test.util.MiniClusterWithClientResource
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.util
import java.util.Collections
import org.apache.flink.api.common.state.ListStateDescriptor
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStreamSource
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.createTypeInformation
import org.apache.flink.util.Collector
import RichStateUtils._

class ListStateTest extends AnyFlatSpec with Matchers with BeforeAndAfter {

Expand Down Expand Up @@ -59,12 +61,13 @@ class ListStateTest extends AnyFlatSpec with Matchers with BeforeAndAfter {
}
}).process(new ProcessFunction[Integer, Integer] {

val listState = new ListState[Integer]()
var listState: org.apache.flink.api.common.state.ListState[Integer] = _

override def open(parameters: Configuration): Unit = {
this.listState(getRuntimeContext.getListState(new ListStateDescriptor[Integer](
this.listState = getRuntimeContext.getListState(new ListStateDescriptor[Integer](
"foo",
createTypeInformation[Integer]
)))
))
}
override def processElement(value: Integer, ctx: ProcessFunction[Integer, Integer]#Context, out: Collector[Integer]): Unit = {

Expand Down

0 comments on commit bc066cc

Please sign in to comment.