From 7fbdc100e19d69d7e31544c20fa94cb2b314ec12 Mon Sep 17 00:00:00 2001 From: kl0u Date: Wed, 22 Mar 2017 15:52:07 +0100 Subject: [PATCH 1/2] [FLINK-6197] [cep] Add support for iterative conditions. So far, the where clause only supported simple FilterFunction conditions. With this, we add support for conditions where an event is accepted not only based on its own properties, e.g. name, as it was before, but also based on some statistic computed over previously accepted events in the pattern, e.g. if the price is higher than the average of the prices of the previously accepted events. --- docs/dev/libs/cep.md | 94 ++- .../flink/cep/scala/pattern/Pattern.scala | 30 +- .../flink/cep/scala/pattern/PatternTest.scala | 44 +- .../flink/cep/nfa/ComputationState.java | 81 +- .../java/org/apache/flink/cep/nfa/NFA.java | 251 +++--- .../apache/flink/cep/nfa/SharedBuffer.java | 118 +-- .../java/org/apache/flink/cep/nfa/State.java | 18 +- .../apache/flink/cep/nfa/StateTransition.java | 51 +- .../flink/cep/nfa/compiler/NFACompiler.java | 47 +- .../flink/cep/pattern/AndFilterFunction.java | 6 +- .../flink/cep/pattern/OrFilterFunction.java | 6 +- .../org/apache/flink/cep/pattern/Pattern.java | 44 +- .../cep/pattern/SubtypeFilterFunction.java | 6 +- .../cep/pattern/conditions/AndCondition.java | 57 ++ .../BooleanConditions.java} | 29 +- .../conditions/IterativeCondition.java | 98 +++ .../NotCondition.java} | 18 +- .../cep/pattern/conditions/OrCondition.java | 57 ++ .../pattern/conditions/SimpleCondition.java | 39 + .../pattern/conditions/SubtypeCondition.java | 41 + .../java/org/apache/flink/cep/CEPITCase.java | 42 +- .../org/apache/flink/cep/nfa/NFAITCase.java | 726 ++++++++++++++---- .../org/apache/flink/cep/nfa/NFATest.java | 18 +- .../flink/cep/nfa/SharedBufferTest.java | 9 +- .../cep/nfa/compiler/NFACompilerTest.java | 8 +- .../cep/operator/CEPMigration11to13Test.java | 8 +- .../cep/operator/CEPMigration12to13Test.java | 8 +- .../flink/cep/operator/CEPOperatorTest.java | 10 +- .../flink/cep/operator/CEPRescalingTest.java | 8 +- .../apache/flink/cep/pattern/PatternTest.java | 52 +- 30 files changed, 1495 insertions(+), 529 deletions(-) create mode 100644 flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java rename flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/{FilterFunctions.java => conditions/BooleanConditions.java} (57%) create mode 100644 flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java rename flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/{NotFilterFunction.java => conditions/NotCondition.java} (67%) create mode 100644 flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java create mode 100644 flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java create mode 100644 flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md index 22cffbcdcc398..9d4ca913c2f7e 100644 --- a/docs/dev/libs/cep.md +++ b/docs/dev/libs/cep.md @@ -124,13 +124,70 @@ val start : Pattern[Event, _] = Pattern.begin("start") -Each state must have an unique name to identify the matched events later on. +Each state must have a unique name to identify the matched events later on. Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method. +These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`. + +**Iterative Conditions:** This type of conditions can iterate over the previously accepted elements in the pattern and +decide to accept a new element or not, based on some statistic over those elements. + +Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum +of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do +not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g. +`oneToMany` or `zeroToMany`. + +
+
+{% highlight java %} +start.where(new IterativeCondition() { + @Override + public boolean filter(SubEvent value, Context ctx) throws Exception { + if (!value.getName().startsWith("foo")) { + return false; + } + + double sum = 0.0; + for (Event event : ctx.getEventsForPattern("middle")) { + sum += event.getPrice(); + } + sum += value.getPrice(); + return Double.compare(sum, 5.0) < 0; + } +}); +{% endhighlight %} +
+ +
+{% highlight scala %} +start.where( + (value, ctx) => { + var res = value.getName.startsWith("foo") + if (res) { + var sum = 0.0 + for (e: Event <- ctx.getEventsForPattern("middle")) { + sum += e.getPrice + } + sum += value.getPrice + res = res && sum < 5.0 + } + res + } +) +{% endhighlight %} +
+
+ +Attention The call to `Context.getEventsForPattern(...)` has to find the +elements that belong to the pattern. The cost of this operation can vary, so when implementing your condition, try +to minimize the times the method is called. + +**Simple Conditions:** This type of conditions extend the aforementioned `IterativeCondition` class. They are simple +filtering conditions that decide to accept an element or not, based only on properties of the element itself.
{% highlight java %} -start.where(new FilterFunction() { +start.where(new SimpleCondition() { @Override public boolean filter(Event value) { return ... // some condition @@ -151,7 +208,7 @@ We can also restrict the type of the accepted event to some subtype of the initi
{% highlight java %} -start.subtype(SubEvent.class).where(new FilterFunction() { +start.subtype(SubEvent.class).where(new SimpleCondition() { @Override public boolean filter(SubEvent value) { return ... // some condition @@ -168,7 +225,7 @@ start.subtype(classOf[SubEvent]).where(subEvent => ... /* some condition */)
As it can be seen here, the subtype condition can also be combined with an additional filter condition on the subtype. -In fact you can always provide multiple conditions by calling `where` and `subtype` multiple times. +In fact, you can always provide multiple conditions by calling `where` and `subtype` multiple times. These conditions will then be combined using the logical AND operator. In order to construct or conditions, one has to call the `or` method with a respective filter function. @@ -177,12 +234,12 @@ Any existing filter function is then ORed with the given one.
{% highlight java %} -pattern.where(new FilterFunction() { +pattern.where(new SimpleCondition() { @Override public boolean filter(Event value) { return ... // some condition } -}).or(new FilterFunction() { +}).or(new SimpleCondition() { @Override public boolean filter(Event value) { return ... // or condition @@ -201,8 +258,8 @@ pattern.where(event => ... /* some condition */).or(event => ... /* or condition Next, we can append further states to detect complex patterns. We can control the contiguity of two succeeding events to be accepted by the pattern. -Strict contiguity means that two matching events have to succeed directly. -This means that no other events can occur in between. +Strict contiguity means that two matching events have to be directly the one after the other. +This means that no other events can occur in between. A strict contiguity pattern state can be created via the `next` method.
@@ -236,7 +293,8 @@ val nonStrictNext : Pattern[Event, _] = start.followedBy("middle")
It is also possible to define a temporal constraint for the pattern to be valid. -For example, one can define that a pattern should occur within 10 seconds via the `within` method. +For example, one can define that a pattern should occur within 10 seconds via the `within` method. +Temporal patterns are supported for both [processing and event time]({{site.baseurl}}/dev/event_time.html).
@@ -294,11 +352,11 @@ Pattern followedBy = start.followedBy("next"); Where -

Defines a filter condition for the current pattern state. Only if an event passes the filter, it can match the state:

+

Defines a condition for the current pattern state. Only if an event satisifes the condition, it can match the state:

{% highlight java %} -patternState.where(new FilterFunction() { +patternState.where(new IterativeCondition() { @Override - public boolean filter(Event value) throws Exception { + public boolean filter(Event value, Context ctx) throws Exception { return ... // some condition } }); @@ -310,14 +368,14 @@ patternState.where(new FilterFunction() {

Adds a new filter condition which is ORed with an existing filter condition. Only if an event passes the filter condition, it can match the state:

{% highlight java %} -patternState.where(new FilterFunction() { +patternState.where(new IterativeCondition() { @Override - public boolean filter(Event value) throws Exception { + public boolean filter(Event value, Context ctx) throws Exception { return ... // some condition } -}).or(new FilterFunction() { +}).or(new IterativeCondition() { @Override - public boolean filter(Event value) throws Exception { + public boolean filter(Event value, Context ctx) throws Exception { return ... // alternative condition } }); @@ -684,12 +742,12 @@ DataStream partitionedInput = input.keyBy(new KeySelector }); Pattern pattern = Pattern.begin("start") - .next("middle").where(new FilterFunction() { + .next("middle").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("error"); } - }).followedBy("end").where(new FilterFunction() { + }).followedBy("end").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("critical"); diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala index 5baf78081407d..a1db460a46326 100644 --- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala @@ -17,8 +17,9 @@ */ package org.apache.flink.cep.scala.pattern -import org.apache.flink.api.common.functions.FilterFunction import org.apache.flink.cep +import org.apache.flink.cep.pattern.conditions.IterativeCondition +import org.apache.flink.cep.pattern.conditions.IterativeCondition.Context import org.apache.flink.cep.pattern.{Quantifier, Pattern => JPattern} import org.apache.flink.streaming.api.windowing.time.Time @@ -67,8 +68,8 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) { * * @return Filter condition for an event to be matched */ - def getFilterFunction(): Option[FilterFunction[F]] = { - Option(jPattern.getFilterFunction()) + def getCondition(): Option[IterativeCondition[F]] = { + Option(jPattern.getCondition()) } /** @@ -127,7 +128,7 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) { * @param filter Filter condition * @return The same pattern operator where the new filter condition is set */ - def where(filter: FilterFunction[F]): Pattern[T, F] = { + def where(filter: IterativeCondition[F]): Pattern[T, F] = { jPattern.where(filter) this } @@ -138,11 +139,26 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) { * @param filter Or filter function * @return The same pattern operator where the new filter condition is set */ - def or(filter: FilterFunction[F]): Pattern[T, F] = { + def or(filter: IterativeCondition[F]): Pattern[T, F] = { jPattern.or(filter) this } + /** + * Specifies a filter condition which has to be fulfilled by an event in order to be matched. + * + * @param filterFun Filter condition + * @return The same pattern operator where the new filter condition is set + */ + def where(filterFun: (F, Context[F]) => Boolean): Pattern[T, F] = { + val filter = new IterativeCondition[F] { + val cleanFilter = cep.scala.cleanClosure(filterFun) + + override def filter(value: F, ctx: Context[F]): Boolean = cleanFilter(value, ctx) + } + where(filter) + } + /** * Specifies a filter condition which has to be fulfilled by an event in order to be matched. * @@ -150,10 +166,10 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) { * @return The same pattern operator where the new filter condition is set */ def where(filterFun: F => Boolean): Pattern[T, F] = { - val filter = new FilterFunction[F] { + val filter = new IterativeCondition[F] { val cleanFilter = cep.scala.cleanClosure(filterFun) - override def filter(value: F): Boolean = cleanFilter(value) + override def filter(value: F, ctx: Context[F]): Boolean = cleanFilter(value) } where(filter) } diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala index 5f49031fe10c9..a95dddd460c3b 100644 --- a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala +++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala @@ -17,12 +17,13 @@ */ package org.apache.flink.cep.scala.pattern -import org.apache.flink.api.common.functions.FilterFunction -import org.apache.flink.cep.pattern.{AndFilterFunction, SubtypeFilterFunction, Pattern => JPattern} +import org.apache.flink.cep.pattern.{Pattern => JPattern} import org.junit.Assert._ import org.junit.Test import org.apache.flink.cep.Event import org.apache.flink.cep.SubEvent +import org.apache.flink.cep.pattern.conditions.IterativeCondition.Context +import org.apache.flink.cep.pattern.conditions._ class PatternTest { @@ -80,19 +81,19 @@ class PatternTest { def testStrictContiguityWithCondition: Unit = { val pattern = Pattern.begin[Event]("start") .next("next") - .where((value: Event) => value.getName() == "foobar") + .where((value: Event, ctx: Context[Event]) => value.getName() == "foobar") .next("end") - .where((value: Event) => value.getId() == 42) + .where((value: Event, ctx: Context[Event]) => value.getId() == 42) val jPattern = JPattern.begin[Event]("start") .next("next") - .where(new FilterFunction[Event]() { + .where(new SimpleCondition[Event]() { @throws[Exception] def filter(value: Event): Boolean = { return value.getName() == "foobar" } }).next("end") - .where(new FilterFunction[Event]() { + .where(new SimpleCondition[Event]() { @throws[Exception] def filter(value: Event): Boolean = { return value.getId() == 42 @@ -109,9 +110,9 @@ class PatternTest { assertTrue(previous.getPrevious.isDefined) assertFalse(preprevious.getPrevious.isDefined) - assertTrue(pattern.getFilterFunction.isDefined) - assertTrue(previous.getFilterFunction.isDefined) - assertFalse(preprevious.getFilterFunction.isDefined) + assertTrue(pattern.getCondition.isDefined) + assertTrue(previous.getCondition.isDefined) + assertFalse(preprevious.getCondition.isDefined) assertEquals(pattern.getName, "end") assertEquals(previous.getName, "next") @@ -140,8 +141,8 @@ class PatternTest { assertTrue(previous.getPrevious.isDefined) assertFalse(preprevious.getPrevious.isDefined) - assertTrue(previous.getFilterFunction.isDefined) - assertTrue(previous.getFilterFunction.get.isInstanceOf[SubtypeFilterFunction[_]]) + assertTrue(previous.getCondition.isDefined) + assertTrue(previous.getCondition.get.isInstanceOf[SubtypeCondition[_]]) assertEquals(pattern.getName, "end") assertEquals(previous.getName, "subevent") @@ -159,7 +160,7 @@ class PatternTest { val jpattern = JPattern.begin[Event]("start") .next("subevent") .subtype(classOf[SubEvent]) - .where(new FilterFunction[SubEvent]() { + .where(new SimpleCondition[SubEvent]() { @throws[Exception] def filter(value: SubEvent): Boolean = { return false @@ -178,7 +179,7 @@ class PatternTest { assertFalse(preprevious.getPrevious.isDefined) assertTrue(pattern.isInstanceOf[FollowedByPattern[_, _]]) - assertTrue(previous.getFilterFunction.isDefined) + assertTrue(previous.getCondition().isDefined) assertEquals(pattern.getName, "end") assertEquals(previous.getName, "subevent") @@ -206,8 +207,8 @@ class PatternTest { jPattern.getClass().getSimpleName()) //best effort to confirm congruent filter functions && compareFilterFunctions( - pattern.getFilterFunction.orNull, - jPattern.getFilterFunction()) + pattern.getCondition().orNull, + jPattern.getCondition()) //recursively check previous patterns && checkCongruentRepresentations( pattern.getPrevious.orNull, @@ -218,7 +219,8 @@ class PatternTest { a == b && b == c } - def compareFilterFunctions(sFilter: FilterFunction[_], jFilter: FilterFunction[_]): Boolean = { + def compareFilterFunctions(sFilter: IterativeCondition[_], + jFilter: IterativeCondition[_]): Boolean = { /** * We would like to simply compare the filter functions like this: * @@ -230,16 +232,16 @@ class PatternTest { */ (sFilter, jFilter) match { //matching types: and-filter; branch and recurse for inner filters - case (saf: AndFilterFunction[_], jaf: AndFilterFunction[_]) + case (saf: AndCondition[_], jaf: AndCondition[_]) => (compareFilterFunctions(saf.getLeft(), jaf.getLeft()) && compareFilterFunctions(saf.getRight(), jaf.getRight())) //matching types: subtype-filter - case (saf: SubtypeFilterFunction[_], jaf: SubtypeFilterFunction[_]) => true + case (saf: SubtypeCondition[_], jaf: SubtypeCondition[_]) => true //mismatch: one-sided and/subtype-filter - case (_: AndFilterFunction[_] | _: SubtypeFilterFunction[_], _) => false - case (_, _: AndFilterFunction[_] | _: SubtypeFilterFunction[_]) => false + case (_: AndCondition[_] | _: SubtypeCondition[_], _) => false + case (_, _: AndCondition[_] | _: SubtypeCondition[_]) => false //from here we can only check mutual presence or absence of a function - case (s: FilterFunction[_], j: FilterFunction[_]) => true + case (s: IterativeCondition[_], j: IterativeCondition[_]) => true case (null, null) => true case _ => false } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java index 445d038cc48ee..80227fc99f77b 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java @@ -18,8 +18,14 @@ package org.apache.flink.cep.nfa; +import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.util.Preconditions; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + /** * Helper class which encapsulates the state of the NFA computation. It points to the current state, * the last taken event, its occurrence timestamp, the current version and the starting timestamp @@ -45,7 +51,10 @@ public class ComputationState { private final State previousState; + private final ConditionContext conditionContext; + private ComputationState( + final NFA nfa, final State currentState, final State previousState, final T event, @@ -58,6 +67,11 @@ private ComputationState( this.version = version; this.startTimestamp = startTimestamp; this.previousState = previousState; + this.conditionContext = new ConditionContext(nfa, this); + } + + public ConditionContext getConditionContext() { + return conditionContext; } public boolean isFinalState() { @@ -92,23 +106,80 @@ public DeweyNumber getVersion() { return version; } - public static ComputationState createStartState(final State state) { + public static ComputationState createStartState(final NFA nfa, final State state) { Preconditions.checkArgument(state.isStart()); - return new ComputationState<>(state, null, null, -1L, new DeweyNumber(1), -1L); + return new ComputationState<>(nfa, state, null, null, -1L, new DeweyNumber(1), -1L); } - public static ComputationState createStartState(final State state, final DeweyNumber version) { + public static ComputationState createStartState(final NFA nfa, final State state, final DeweyNumber version) { Preconditions.checkArgument(state.isStart()); - return new ComputationState<>(state, null, null, -1L, version, -1L); + return new ComputationState<>(nfa, state, null, null, -1L, version, -1L); } public static ComputationState createState( + final NFA nfa, final State currentState, final State previousState, final T event, final long timestamp, final DeweyNumber version, final long startTimestamp) { - return new ComputationState<>(currentState, previousState, event, timestamp, version, startTimestamp); + return new ComputationState<>(nfa, currentState, previousState, event, timestamp, version, startTimestamp); + } + + /** + * The context used when evaluating this computation state. + */ + public class ConditionContext implements IterativeCondition.Context { + + private static final long serialVersionUID = -6733978464782277795L; + + /** + * A flag indicating if we should recompute the matching pattern, so that + * the {@link IterativeCondition iterative condition} can be evaluated. + */ + private boolean shouldUpdate; + + /** The current computation state. */ + private transient ComputationState computationState; + + /** The owning {@link NFA} of this computation state. */ + private final NFA nfa; + + /** + * The matched pattern so far. A condition will be evaluated over this + * pattern. This is evaluated only once, as this is an expensive + * operation that traverses a path in the {@link SharedBuffer}. + */ + private transient Map> matchedEvents; + + public ConditionContext(NFA nfa, ComputationState computationState) { + this.nfa = nfa; + this.computationState = computationState; + this.shouldUpdate = true; + } + + @Override + public Iterable getEventsForPattern(final String key) { + Preconditions.checkNotNull(key); + + // the (partially) matched pattern is computed lazily when this method is called. + // this is to avoid any overheads when using a simple, non-iterative condition. + + if (shouldUpdate) { + this.matchedEvents = nfa.extractCurrentMatches(computationState); + shouldUpdate = false; + } + + return new Iterable() { + @Override + public Iterator iterator() { + List elements = matchedEvents.get(key); + return elements == null + ? Collections.EMPTY_LIST.iterator() + : elements.iterator(); + } + }; + } } } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index ab03566ae9011..cddc1edf759a5 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -21,18 +21,19 @@ import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import com.google.common.collect.LinkedHashMultimap; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; import org.apache.flink.cep.NonDuplicatingTypeSerializer; import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; import java.io.ByteArrayInputStream; @@ -140,10 +141,9 @@ public NFA( this.nonDuplicatingTypeSerializer = new NonDuplicatingTypeSerializer<>(eventSerializer); this.windowTime = windowTime; this.handleTimeout = handleTimeout; - stringSharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer); - computationStates = new LinkedList<>(); - - states = new HashSet<>(); + this.stringSharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer); + this.computationStates = new LinkedList<>(); + this.states = new HashSet<>(); } public Set> getStates() { @@ -160,7 +160,7 @@ public void addState(final State state) { states.add(state); if (state.isStart()) { - computationStates.add(ComputationState.createStartState(state)); + computationStates.add(ComputationState.createStartState(this, state)); } } @@ -214,10 +214,6 @@ public Tuple2>, Collection, Long computationState.getPreviousState().getName(), computationState.getEvent(), computationState.getTimestamp()); - stringSharedBuffer.remove( - computationState.getPreviousState().getName(), - computationState.getEvent(), - computationState.getTimestamp()); newComputationStates = Collections.emptyList(); } else if (event != null) { @@ -233,8 +229,10 @@ public Tuple2>, Collection, Long result.addAll(matches); // remove found patterns because they are no longer needed - stringSharedBuffer.release(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp()); - stringSharedBuffer.remove(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp()); + stringSharedBuffer.release( + newComputationState.getPreviousState().getName(), + newComputationState.getEvent(), + newComputationState.getTimestamp()); } else { // add new computation state; it will be processed once the next event arrives computationStates.add(newComputationState); @@ -332,23 +330,29 @@ private boolean isSelfIgnore(final StateTransition edge) { /** * Computes the next computation states based on the given computation state, the current event, * its timestamp and the internal state machine. The algorithm is: - * - * 1. Decide on valid transitions and number of branching paths. See {@link OutgoingEdges} - * 2. Perform transitions: - * a) IGNORE (links in {@link SharedBuffer} will still point to the previous event) - * - do not perform for Start State - special case - * - if stays in the same state increase the current stage for future use with number of - * outgoing edges - * - if after PROCEED increase current stage and add new stage (as we change the state) - * - lock the entry in {@link SharedBuffer} as it is needed in the created branch - * b) TAKE (links in {@link SharedBuffer} will point to the current event) - * - add entry to the shared buffer with version of the current computation state - * - add stage and then increase with number of takes for the future computation states - * - peek to the next state if it has PROCEED path to a Final State, if true create - * Final ComputationState to emit results - * 3. Handle the Start State, as it always have to remain - * 4. Release the corresponding entries in {@link SharedBuffer}. - * + *
    + *
  1. Decide on valid transitions and number of branching paths. See {@link OutgoingEdges}
  2. + *
  3. Perform transitions: + *
      + *
    1. IGNORE (links in {@link SharedBuffer} will still point to the previous event)
    2. + *
        + *
      • do not perform for Start State - special case
      • + *
      • if stays in the same state increase the current stage for future use with number of outgoing edges
      • + *
      • if after PROCEED increase current stage and add new stage (as we change the state)
      • + *
      • lock the entry in {@link SharedBuffer} as it is needed in the created branch
      • + *
      + *
    3. TAKE (links in {@link SharedBuffer} will point to the current event)
    4. + *
        + *
      • add entry to the shared buffer with version of the current computation state
      • + *
      • add stage and then increase with number of takes for the future computation states
      • + *
      • peek to the next state if it has PROCEED path to a Final State, if true create Final + * ComputationState to emit results
      • + *
      + *
    + *
  4. + *
  5. Handle the Start State, as it always have to remain
  6. + *
  7. Release the corresponding entries in {@link SharedBuffer}.
  8. + *
* * @param computationState Current computation state * @param event Current event which is processed @@ -387,85 +391,81 @@ private Collection> computeNextStates( ignoreBranchesToVisit--; } - resultingComputationStates.add( - ComputationState.createState( + addComputationState( + resultingComputationStates, edge.getTargetState(), computationState.getPreviousState(), computationState.getEvent(), computationState.getTimestamp(), version, computationState.getStartTimestamp() - ) ); - stringSharedBuffer.lock( - computationState.getPreviousState().getName(), - computationState.getEvent(), - computationState.getTimestamp()); } } break; case TAKE: - final State newState = edge.getTargetState(); - final State consumingState = edge.getSourceState(); - final State previousEventState = computationState.getPreviousState(); + final State nextState = edge.getTargetState(); + final State currentState = edge.getSourceState(); + final State previousState = computationState.getPreviousState(); final T previousEvent = computationState.getEvent(); - final DeweyNumber currentVersion = computationState.getVersion(); - final DeweyNumber newComputationStateVersion = new DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit); + final DeweyNumber currentVersion = computationState.getVersion(); + final DeweyNumber nextVersion = new DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit); takeBranchesToVisit--; final long startTimestamp; if (computationState.isStartState()) { startTimestamp = timestamp; stringSharedBuffer.put( - consumingState.getName(), + currentState.getName(), event, timestamp, currentVersion); } else { startTimestamp = computationState.getStartTimestamp(); stringSharedBuffer.put( - consumingState.getName(), + currentState.getName(), event, timestamp, - previousEventState.getName(), + previousState.getName(), previousEvent, computationState.getTimestamp(), currentVersion); } - // a new computation state is referring to the shared entry - stringSharedBuffer.lock(consumingState.getName(), event, timestamp); - - resultingComputationStates.add(ComputationState.createState( - newState, - consumingState, - event, - timestamp, - newComputationStateVersion, - startTimestamp - )); + addComputationState( + resultingComputationStates, + nextState, + currentState, + event, + timestamp, + nextVersion, + startTimestamp); //check if newly created state is optional (have a PROCEED path to Final state) - final State finalState = findFinalStateAfterProceed(newState, event); + final State finalState = findFinalStateAfterProceed(nextState, event, computationState); if (finalState != null) { - stringSharedBuffer.lock(consumingState.getName(), event, timestamp); - resultingComputationStates.add(ComputationState.createState( - finalState, - consumingState, - event, - timestamp, - newComputationStateVersion, - startTimestamp)); + addComputationState( + resultingComputationStates, + finalState, + currentState, + event, + timestamp, + nextVersion, + startTimestamp); } break; } } if (computationState.isStartState()) { - final int totalBranches = calculateIncreasingSelfState(outgoingEdges.getTotalIgnoreBranches(), outgoingEdges.getTotalTakeBranches()); - final ComputationState startState = createStartState(computationState, totalBranches); + int totalBranches = calculateIncreasingSelfState( + outgoingEdges.getTotalIgnoreBranches(), + outgoingEdges.getTotalTakeBranches()); + + DeweyNumber startVersion = computationState.getVersion().increase(totalBranches); + ComputationState startState = ComputationState.createStartState(this, computationState.getState(), startVersion); resultingComputationStates.add(startState); } @@ -475,17 +475,26 @@ private Collection> computeNextStates( computationState.getPreviousState().getName(), computationState.getEvent(), computationState.getTimestamp()); - // try to remove unnecessary shared buffer entries - stringSharedBuffer.remove( - computationState.getPreviousState().getName(), - computationState.getEvent(), - computationState.getTimestamp()); } return resultingComputationStates; } - private State findFinalStateAfterProceed(State state, T event) { + private void addComputationState( + List> computationStates, + State currentState, + State previousState, + T event, + long timestamp, + DeweyNumber version, + long startTimestamp) { + ComputationState computationState = ComputationState.createState( + this, currentState, previousState, event, timestamp, version, startTimestamp); + computationStates.add(computationState); + stringSharedBuffer.lock(previousState.getName(), event, timestamp); + } + + private State findFinalStateAfterProceed(State state, T event, ComputationState computationState) { final Stack> statesToCheck = new Stack<>(); statesToCheck.push(state); @@ -494,7 +503,7 @@ private State findFinalStateAfterProceed(State state, T event) { final State currentState = statesToCheck.pop(); for (StateTransition transition : currentState.getStateTransitions()) { if (transition.getAction() == StateTransitionAction.PROCEED && - checkFilterCondition(transition.getCondition(), event)) { + checkFilterCondition(computationState, transition.getCondition(), event)) { if (transition.getTargetState().isFinal()) { return transition.getTargetState(); } else { @@ -514,15 +523,12 @@ private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) { return takeBranches == 0 && ignoreBranches == 0 ? 0 : ignoreBranches + 1; } - private ComputationState createStartState(final ComputationState computationState, final int totalBranches) { - final DeweyNumber startVersion = computationState.getVersion().increase(totalBranches); - return ComputationState.createStartState(computationState.getState(), startVersion); - } - private OutgoingEdges createDecisionGraph(ComputationState computationState, T event) { + final OutgoingEdges outgoingEdges = new OutgoingEdges<>(computationState.getState()); + final Stack> states = new Stack<>(); states.push(computationState.getState()); - final OutgoingEdges outgoingEdges = new OutgoingEdges<>(computationState.getState()); + //First create all outgoing edges, so to be able to reason about the Dewey version while (!states.isEmpty()) { State currentState = states.pop(); @@ -531,7 +537,7 @@ private OutgoingEdges createDecisionGraph(ComputationState computationStat // check all state transitions for each state for (StateTransition stateTransition : stateTransitions) { try { - if (checkFilterCondition(stateTransition.getCondition(), event)) { + if (checkFilterCondition(computationState, stateTransition.getCondition(), event)) { // filter condition is true switch (stateTransition.getAction()) { case PROCEED: @@ -553,9 +559,38 @@ private OutgoingEdges createDecisionGraph(ComputationState computationStat return outgoingEdges; } + private boolean checkFilterCondition(ComputationState computationState, IterativeCondition condition, T event) throws Exception { + return condition == null || condition.filter(event, computationState.getConditionContext()); + } + + Map> extractCurrentMatches(final ComputationState computationState) { + if (computationState.getPreviousState() == null) { + return new HashMap<>(); + } + + Collection> paths = stringSharedBuffer.extractPatterns( + computationState.getPreviousState().getName(), + computationState.getEvent(), + computationState.getTimestamp(), + computationState.getVersion()); - private boolean checkFilterCondition(FilterFunction condition, T event) throws Exception { - return condition == null || condition.filter(event); + // for a given computation state, we cannot have more than one matching patterns. + Preconditions.checkArgument(paths.size() <= 1); + + TypeSerializer serializer = nonDuplicatingTypeSerializer.getTypeSerializer(); + + Map> result = new HashMap<>(); + for (LinkedHashMultimap path: paths) { + for (String key: path.keySet()) { + Set events = path.get(key); + List values = new ArrayList<>(events.size()); + for (T event: events) { + values.add(serializer.isImmutableType() ? event : serializer.copy(event)); + } + result.put(key, values); + } + } + return result; } /** @@ -573,6 +608,9 @@ private Collection> extractPatternMatches(final ComputationState< computationState.getTimestamp(), computationState.getVersion()); + // for a given computation state, we cannot have more than one matching patterns. + Preconditions.checkArgument(paths.size() <= 1); + List> result = new ArrayList<>(); TypeSerializer serializer = nonDuplicatingTypeSerializer.getTypeSerializer(); @@ -601,6 +639,28 @@ private Collection> extractPatternMatches(final ComputationState< return result; } + /** + * Generates a state name from a given name template and an index. + *

+ * If the template ends with "[]" the index is inserted in between the square brackets. + * Otherwise, an underscore and the index is appended to the name. + * + * @param name Name template + * @param index Index of the state + * @return Generated state name from the given state name template + */ + static String generateStateName(final String name, final int index) { + Matcher matcher = namePattern.matcher(name); + + if (matcher.matches()) { + return matcher.group(1) + index + matcher.group(2); + } else { + return name + "_" + index; + } + } + + ////////////////////// Fault-Tolerance / Migration ////////////////////// + private void writeObject(ObjectOutputStream oos) throws IOException { oos.defaultWriteObject(); @@ -692,6 +752,7 @@ public boolean apply(@Nullable StateTransition input) { final State previousState = convertedStates.get(previousName); computationStates.add(ComputationState.createState( + this, convertedStates.get(currentName), previousState, readState.getEvent(), @@ -710,6 +771,7 @@ public boolean apply(@Nullable State input) { }).getName(); computationStates.add(ComputationState.createStartState( + this, convertedStates.get(startName), new DeweyNumber(this.startEventCounter))); @@ -761,32 +823,13 @@ private ComputationState readComputationState(ObjectInputStream ois) throws I event = null; } - return ComputationState.createState(state, previousState, event, timestamp, version, startTimestamp); + return ComputationState.createState(this, state, previousState, event, timestamp, version, startTimestamp); } + ////////////////////// Serialization ////////////////////// /** - * Generates a state name from a given name template and an index. - *

- * If the template ends with "[]" the index is inserted in between the square brackets. - * Otherwise, an underscore and the index is appended to the name. - * - * @param name Name template - * @param index Index of the state - * @return Generated state name from the given state name template - */ - static String generateStateName(final String name, final int index) { - Matcher matcher = namePattern.matcher(name); - - if (matcher.matches()) { - return matcher.group(1) + index + matcher.group(2); - } else { - return name + "_" + index; - } - } - - /** - * {@link TypeSerializer} for {@link NFA} that uses Java Serialization. + * A {@link TypeSerializer} for {@link NFA} that uses Java Serialization. */ public static class Serializer extends TypeSerializer> { diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java index d5b78763cc5ab..ccc68849473a1 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java @@ -52,12 +52,14 @@ * * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams". * - * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf + * @see + * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf * * @param Type of the keys * @param Type of the values */ public class SharedBuffer implements Serializable { + private static final long serialVersionUID = 9213251042562206495L; private final TypeSerializer valueSerializer; @@ -66,20 +68,20 @@ public class SharedBuffer implements Serializable { public SharedBuffer(final TypeSerializer valueSerializer) { this.valueSerializer = valueSerializer; - pages = new HashMap<>(); + this.pages = new HashMap<>(); } /** * Stores given value (value + timestamp) under the given key. It assigns a preceding element * relation to the entry which is defined by the previous key, value (value + timestamp). * - * @param key Key of the current value - * @param value Current value - * @param timestamp Timestamp of the current value (a value requires always a timestamp to make it uniquely referable)) - * @param previousKey Key of the value for the previous relation - * @param previousValue Value for the previous relation + * @param key Key of the current value + * @param value Current value + * @param timestamp Timestamp of the current value (a value requires always a timestamp to make it uniquely referable)) + * @param previousKey Key of the value for the previous relation + * @param previousValue Value for the previous relation * @param previousTimestamp Timestamp of the value for the previous relation - * @param version Version of the previous relation + * @param version Version of the previous relation */ public void put( final K key, @@ -89,14 +91,6 @@ public void put( final V previousValue, final long previousTimestamp, final DeweyNumber version) { - SharedBufferPage page; - - if (!pages.containsKey(key)) { - page = new SharedBufferPage(key); - pages.put(key, page); - } else { - page = pages.get(key); - } final SharedBufferEntry previousSharedBufferEntry = get(previousKey, previousValue, previousTimestamp); @@ -108,55 +102,41 @@ public void put( "relation has been already pruned, even though you expect it to be still there."); } - page.add( - new ValueTimeWrapper<>(value, timestamp), - previousSharedBufferEntry, - version); + put(key, value, timestamp, previousSharedBufferEntry, version); } /** * Stores given value (value + timestamp) under the given key. It assigns no preceding element * relation to the entry. * - * @param key Key of the current value - * @param value Current value + * @param key Key of the current value + * @param value Current value * @param timestamp Timestamp of the current value (a value requires always a timestamp to make it uniquely referable)) - * @param version Version of the previous relation + * @param version Version of the previous relation */ public void put( - final K key, - final V value, - final long timestamp, - final DeweyNumber version) { - SharedBufferPage page; - - if (!pages.containsKey(key)) { - page = new SharedBufferPage(key); - pages.put(key, page); - } else { - page = pages.get(key); - } + final K key, + final V value, + final long timestamp, + final DeweyNumber version) { - page.add( - new ValueTimeWrapper<>(value, timestamp), - null, - version); + put(key, value, timestamp, null, version); } - /** - * Checks whether the given key, value, timestamp triple is contained in the shared buffer - * - * @param key Key of the value - * @param value Value - * @param timestamp Timestamp of the value - * @return Whether a value with the given timestamp is registered under the given key - */ - public boolean contains( - final K key, - final V value, - final long timestamp) { + private void put( + final K key, + final V value, + final long timestamp, + final SharedBufferEntry previousSharedBufferEntry, + final DeweyNumber version) { - return pages.containsKey(key) && pages.get(key).contains(new ValueTimeWrapper<>(value, timestamp)); + SharedBufferPage page = pages.get(key); + if (page == null) { + page = new SharedBufferPage<>(key); + pages.put(key, page); + } + + page.add(new ValueTimeWrapper<>(value, timestamp), previousSharedBufferEntry, version); } public boolean isEmpty() { @@ -272,47 +252,29 @@ public Collection> extractPatterns( * Increases the reference counter for the given value, key, timestamp entry so that it is not * accidentally removed. * - * @param key Key of the value to lock - * @param value Value to lock + * @param key Key of the value to lock + * @param value Value to lock * @param timestamp Timestamp of the value to lock */ public void lock(final K key, final V value, final long timestamp) { SharedBufferEntry entry = get(key, value, timestamp); - if (entry != null) { entry.increaseReferenceCounter(); } } /** - * Decreases the reference counter for the given value, key, timstamp entry so that it can be + * Decreases the reference counter for the given value, key, timestamp entry so that it can be * removed once the reference counter reaches 0. * - * @param key Key of the value to release - * @param value Value to release + * @param key Key of the value to release + * @param value Value to release * @param timestamp Timestamp of the value to release */ public void release(final K key, final V value, final long timestamp) { SharedBufferEntry entry = get(key, value, timestamp); - - if (entry != null ) { - entry.decreaseReferenceCounter(); - } - } - - /** - * Removes the given value, key, timestamp entry if its reference counter is 0. It will also - * release the next element in its previous relation and apply remove to this element - * recursively. - * - * @param key Key of the value to remove - * @param value Value to remove - * @param timestamp Timestamp of the value to remvoe - */ - public void remove(final K key, final V value, final long timestamp) { - SharedBufferEntry entry = get(key, value, timestamp); - if (entry != null) { + entry.decreaseReferenceCounter(); internalRemove(entry); } } @@ -626,10 +588,6 @@ public void add(final ValueTimeWrapper valueTime, final SharedBufferEntry valueTime) { - return entries.containsKey(valueTime); - } - public SharedBufferEntry get(final ValueTimeWrapper valueTime) { return entries.get(valueTime); } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java index 27e0dcd2784ec..c673576386db1 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java @@ -18,7 +18,7 @@ package org.apache.flink.cep.nfa; -import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.cep.pattern.conditions.IterativeCondition; import java.io.IOException; import java.io.ObjectInputStream; @@ -66,29 +66,29 @@ public Collection> getStateTransitions() { } private void addStateTransition( - final StateTransitionAction action, - final State targetState, - final FilterFunction condition) { + final StateTransitionAction action, + final State targetState, + final IterativeCondition condition) { stateTransitions.add(new StateTransition(this, action, targetState, condition)); } - public void addIgnore(final FilterFunction condition) { + public void addIgnore(final IterativeCondition condition) { addStateTransition(StateTransitionAction.IGNORE, this, condition); } - public void addIgnore(final State targetState,final FilterFunction condition) { + public void addIgnore(final State targetState,final IterativeCondition condition) { addStateTransition(StateTransitionAction.IGNORE, targetState, condition); } - public void addTake(final State targetState, final FilterFunction condition) { + public void addTake(final State targetState, final IterativeCondition condition) { addStateTransition(StateTransitionAction.TAKE, targetState, condition); } - public void addProceed(final State targetState, final FilterFunction condition) { + public void addProceed(final State targetState, final IterativeCondition condition) { addStateTransition(StateTransitionAction.PROCEED, targetState, condition); } - public void addTake(final FilterFunction condition) { + public void addTake(final IterativeCondition condition) { addStateTransition(StateTransitionAction.TAKE, this, condition); } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java index e3c7b7a24291c..f80edfc1fc312 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java @@ -19,6 +19,8 @@ package org.apache.flink.cep.nfa; import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.cep.pattern.conditions.IterativeCondition; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; import java.io.Serializable; import java.util.Objects; @@ -29,17 +31,24 @@ public class StateTransition implements Serializable { private final StateTransitionAction action; private final State sourceState; private final State targetState; - private final FilterFunction condition; + private IterativeCondition newCondition; + + /** + * @deprecated This field remains for backwards compatibility. + * Now the conditions extend the {@link IterativeCondition}. + */ + @Deprecated + private FilterFunction condition; public StateTransition( - final State sourceState, - final StateTransitionAction action, - final State targetState, - final FilterFunction condition) { + final State sourceState, + final StateTransitionAction action, + final State targetState, + final IterativeCondition condition) { this.action = action; this.targetState = targetState; this.sourceState = sourceState; - this.condition = condition; + this.newCondition = condition; } public StateTransitionAction getAction() { @@ -54,8 +63,12 @@ public State getSourceState() { return sourceState; } - public FilterFunction getCondition() { - return condition; + public IterativeCondition getCondition() { + if (condition != null) { + this.newCondition = new FilterWrapper<>(condition); + this.condition = null; + } + return newCondition; } @Override @@ -87,7 +100,7 @@ public String toString() { .append(sourceState.getName()).append(", ") .append(targetState.getName()); - if (condition != null) { + if (newCondition != null) { builder.append(", with filter)"); } else { builder.append(")"); @@ -95,4 +108,24 @@ public String toString() { return builder.toString(); } + + /** + * A wrapper to transform a {@link FilterFunction} into a {@link SimpleCondition}. + * This is used only when migrating from an older Flink version. + */ + private static class FilterWrapper extends SimpleCondition { + + private static final long serialVersionUID = -4973016745698940430L; + + private final FilterFunction filterFunction; + + FilterWrapper(FilterFunction filterFunction) { + this.filterFunction = filterFunction; + } + + @Override + public boolean filter(T value) throws Exception { + return filterFunction.filter(value); + } + } } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java index 8bd86128ba3f6..4fb918fd6e56a 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java @@ -21,19 +21,19 @@ import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.State; import org.apache.flink.cep.nfa.StateTransition; import org.apache.flink.cep.nfa.StateTransitionAction; -import org.apache.flink.cep.pattern.FilterFunctions; +import org.apache.flink.cep.pattern.conditions.BooleanConditions; import org.apache.flink.cep.pattern.FollowedByPattern; import org.apache.flink.cep.pattern.MalformedPatternException; -import org.apache.flink.cep.pattern.NotFilterFunction; +import org.apache.flink.cep.pattern.conditions.NotCondition; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.Quantifier; import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty; +import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.streaming.api.windowing.time.Time; import javax.annotation.Nullable; @@ -240,7 +240,7 @@ private State createStartState(State sinkState) { /** * Converts the given state into a "complex" state consisting of given number of states with - * same {@link FilterFunction} + * same {@link IterativeCondition} * * @param sourceState the state to be converted * @param sinkState the state that the converted state should point to @@ -271,8 +271,9 @@ private State convertToTimesState(final State sourceState, final State @SuppressWarnings("unchecked") private void convertToSingletonState(final State sourceState, final State sinkState) { - final FilterFunction currentFilterFunction = (FilterFunction) currentPattern.getFilterFunction(); - final FilterFunction trueFunction = FilterFunctions.trueFunction(); + final IterativeCondition currentFilterFunction = (IterativeCondition) currentPattern.getCondition(); + final IterativeCondition trueFunction = BooleanConditions.trueFunction(); + sourceState.addTake(sinkState, currentFilterFunction); if (currentPattern.getQuantifier() == Quantifier.OPTIONAL) { @@ -303,16 +304,13 @@ private void convertToSingletonState(final State sourceState, final State @SuppressWarnings("unchecked") private State createFirstMandatoryStateOfLoop(final State sinkState, final State.StateType stateType) { - final FilterFunction currentFilterFunction = (FilterFunction) currentPattern.getFilterFunction(); + final IterativeCondition currentFilterFunction = (IterativeCondition) currentPattern.getCondition(); final State firstState = new State<>(currentPattern.getName(), stateType); firstState.addTake(sinkState, currentFilterFunction); if (currentPattern instanceof FollowedByPattern) { - if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.EAGER)) { - firstState.addIgnore(new NotFilterFunction<>(currentFilterFunction)); - } else { - firstState.addIgnore(FilterFunctions.trueFunction()); - } + final IterativeCondition ignoreCondition = getIgnoreCondition(currentPattern); + firstState.addIgnore(ignoreCondition); } return firstState; } @@ -332,8 +330,8 @@ private State createFirstMandatoryStateOfLoop(final State sinkState, final @SuppressWarnings("unchecked") private void convertToLooping(final State sourceState, final State sinkState, boolean isFirstState) { - final FilterFunction filterFunction = (FilterFunction) currentPattern.getFilterFunction(); - final FilterFunction trueFunction = FilterFunctions.trueFunction(); + final IterativeCondition filterFunction = (IterativeCondition) currentPattern.getCondition(); + final IterativeCondition trueFunction = BooleanConditions.trueFunction(); sourceState.addProceed(sinkState, trueFunction); sourceState.addTake(filterFunction); @@ -342,13 +340,7 @@ private void convertToLooping(final State sourceState, final State sinkSta currentPattern.getName(), State.StateType.Normal); - - final FilterFunction ignoreCondition; - if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.EAGER)) { - ignoreCondition = new NotFilterFunction<>(filterFunction); - } else { - ignoreCondition = trueFunction; - } + final IterativeCondition ignoreCondition = getIgnoreCondition(currentPattern); sourceState.addIgnore(ignoreState, ignoreCondition); ignoreState.addTake(sourceState, filterFunction); @@ -368,6 +360,19 @@ private void convertToLooping(final State sourceState, final State sinkSta private void convertToLooping(final State sourceState, final State sinkState) { convertToLooping(sourceState, sinkState, false); } + + /** + * @return The {@link IterativeCondition condition} for the {@code IGNORE} edge + * that corresponds to the specified {@link Pattern}. If the pattern is + * {@link QuantifierProperty#EAGER}, the negated user-specified condition is + * returned. In other case, a condition that always evaluated to {@code true} is + * returned. + */ + private IterativeCondition getIgnoreCondition(Pattern pattern) { + return pattern.getQuantifier().hasProperty(QuantifierProperty.EAGER) + ? new NotCondition<>((IterativeCondition) pattern.getCondition()) + : BooleanConditions.trueFunction(); + } } /** diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java index ecaee07ea86d7..a7391d5d17460 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java @@ -21,11 +21,15 @@ import org.apache.flink.api.common.functions.FilterFunction; /** - * A filter function which combines two filter functions with a logical and. Thus, the filter + * @deprecated This is only used when migrating from an older Flink version. + * Use the {@link org.apache.flink.cep.pattern.conditions.AndCondition} instead. + * + *

A filter function which combines two filter functions with a logical and. Thus, the filter * function only returns true, iff both filters return true. * * @param Type of the element to filter */ +@Deprecated public class AndFilterFunction implements FilterFunction { private static final long serialVersionUID = -2109562093871155005L; diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java index c42ecb13598b6..3620cae832ccd 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java @@ -21,11 +21,15 @@ import org.apache.flink.api.common.functions.FilterFunction; /** - * A filter function which combines two filter functions with a logical or. Thus, the filter + * @deprecated This is only used when migrating from an older Flink version. + * Use the {@link org.apache.flink.cep.pattern.conditions.OrCondition} instead. + * + *

A filter function which combines two filter functions with a logical or. Thus, the filter * function only returns true, iff at least one of the filter functions holds true. * * @param Type of the element to filter */ +@Deprecated public class OrFilterFunction implements FilterFunction { private static final long serialVersionUID = -2109562093871155005L; diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java index 7b4d9c7b1de87..cd5178838c5ab 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java @@ -18,9 +18,12 @@ package org.apache.flink.cep.pattern; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.cep.nfa.NFA; +import org.apache.flink.cep.pattern.conditions.AndCondition; +import org.apache.flink.cep.pattern.conditions.IterativeCondition; +import org.apache.flink.cep.pattern.conditions.OrCondition; +import org.apache.flink.cep.pattern.conditions.SubtypeCondition; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Preconditions; @@ -49,7 +52,7 @@ public class Pattern { private final Pattern previous; // filter condition for an event to be matched - private FilterFunction filterFunction; + private IterativeCondition condition; // window length in which the pattern match has to occur private Time windowTime; @@ -71,8 +74,8 @@ public String getName() { return previous; } - public FilterFunction getFilterFunction() { - return filterFunction; + public IterativeCondition getCondition() { + return condition; } public Time getWindowTime() { @@ -90,36 +93,34 @@ public int getTimes() { /** * Specifies a filter condition which has to be fulfilled by an event in order to be matched. * - * @param newFilterFunction Filter condition + * @param condition Filter condition * @return The same pattern operator where the new filter condition is set */ - public Pattern where(FilterFunction newFilterFunction) { - ClosureCleaner.clean(newFilterFunction, true); + public Pattern where(IterativeCondition condition) { + ClosureCleaner.clean(condition, true); - if (this.filterFunction == null) { - this.filterFunction = newFilterFunction; + if (this.condition == null) { + this.condition = condition; } else { - this.filterFunction = new AndFilterFunction(this.filterFunction, newFilterFunction); + this.condition = new AndCondition<>(this.condition, condition); } - return this; } /** * Specifies a filter condition which is OR'ed with an existing filter function. * - * @param orFilterFunction OR filter condition + * @param condition OR filter condition * @return The same pattern operator where the new filter condition is set */ - public Pattern or(FilterFunction orFilterFunction) { - ClosureCleaner.clean(orFilterFunction, true); + public Pattern or(IterativeCondition condition) { + ClosureCleaner.clean(condition, true); - if (this.filterFunction == null) { - this.filterFunction = orFilterFunction; + if (this.condition == null) { + this.condition = condition; } else { - this.filterFunction = new OrFilterFunction<>(this.filterFunction, orFilterFunction); + this.condition = new OrCondition<>(this.condition, condition); } - return this; } @@ -132,10 +133,11 @@ public Pattern or(FilterFunction orFilterFunction) { * @return The same pattern operator with the new subtype constraint */ public Pattern subtype(final Class subtypeClass) { - if (filterFunction == null) { - this.filterFunction = new SubtypeFilterFunction(subtypeClass); + if (condition == null) { + this.condition = new SubtypeCondition(subtypeClass); } else { - this.filterFunction = new AndFilterFunction(this.filterFunction, new SubtypeFilterFunction(subtypeClass)); + this.condition = new AndCondition<>(this.condition, + new SubtypeCondition(subtypeClass)); } @SuppressWarnings("unchecked") diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java index f183f0f57e74f..ae48df36d627e 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java @@ -21,11 +21,15 @@ import org.apache.flink.api.common.functions.FilterFunction; /** - * A filter function which filters elements of the given type. A element if filtered out iff it + * @deprecated This is only used when migrating from an older Flink version. + * Use the {@link org.apache.flink.cep.pattern.conditions.SubtypeCondition} instead. + * + *

A filter function which filters elements of the given type. A element if filtered out iff it * is not assignable to the given subtype of T. * * @param Type of the elements to be filtered */ +@Deprecated public class SubtypeFilterFunction implements FilterFunction { private static final long serialVersionUID = -2990017519957561355L; diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java new file mode 100644 index 0000000000000..5df7c669f813a --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +/** + * A {@link IterativeCondition condition} which combines two conditions with a logical + * {@code AND} and returns {@code true} if both are {@code true}. + * + * @param Type of the element to filter + */ +public class AndCondition extends IterativeCondition { + + private static final long serialVersionUID = -2471892317390197319L; + + private final IterativeCondition left; + private final IterativeCondition right; + + public AndCondition(final IterativeCondition left, final IterativeCondition right) { + this.left = left; + this.right = right; + } + + @Override + public boolean filter(T value, Context ctx) throws Exception { + return left.filter(value, ctx) && right.filter(value, ctx); + } + + /** + * @return One of the {@link IterativeCondition conditions} combined in this condition. + */ + public IterativeCondition getLeft() { + return left; + } + + /** + * @return One of the {@link IterativeCondition conditions} combined in this condition. + */ + public IterativeCondition getRight() { + return right; + } +} diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java similarity index 57% rename from flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java rename to flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java index 12e58ba33de80..d67b40789080d 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java @@ -15,17 +15,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.cep.pattern; +package org.apache.flink.cep.pattern.conditions; -import org.apache.flink.api.common.functions.FilterFunction; - -public class FilterFunctions { +/** + * Utility class containing an {@link IterativeCondition} that always returns + * {@code true} and one that always returns {@code false}. + */ +public class BooleanConditions { - private FilterFunctions() { - } + /** + * @return An {@link IterativeCondition} that always returns {@code true}. + */ + public static IterativeCondition trueFunction() { + return new SimpleCondition() { + private static final long serialVersionUID = 8379409657655181451L; - public static FilterFunction trueFunction() { - return new FilterFunction() { @Override public boolean filter(T value) throws Exception { return true; @@ -33,8 +37,13 @@ public boolean filter(T value) throws Exception { }; } - public static FilterFunction falseFunction() { - return new FilterFunction() { + /** + * @return An {@link IterativeCondition} that always returns {@code false}. + */ + public static IterativeCondition falseFunction() { + return new SimpleCondition() { + private static final long serialVersionUID = -823981593720949910L; + @Override public boolean filter(T value) throws Exception { return false; diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java new file mode 100644 index 0000000000000..f225e01ae7d5f --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +import org.apache.flink.api.common.functions.Function; + +import java.io.Serializable; + +/** + * A user-defined condition that decides if an element should be accepted in the pattern or not. + * Accepting an element also signals a state transition for the corresponding {@link org.apache.flink.cep.nfa.NFA}. + * + *

A condition can be a simple filter or a more complex condition that iterates over the previously accepted + * elements in the pattern and decides to accept a new element or not based on some statistic over these elements. + * In the former case, the condition should extend the {@link SimpleCondition} class. In the later, the condition + * should extend this class, which gives you also access to the previously accepted elements through a {@link Context}. + * + *

An iterative condition that accepts an element if i) its name is middle, and ii) the sum of the prices of all + * accepted elements is less than {@code 5} would look like: + * + *

+ * {@code
+ * private class MyCondition extends IterativeCondition {
+ *
+ * 		@Override
+ *     	public boolean filter(Event value, Context ctx) throws Exception {
+ *     		if (!value.getName().equals("middle")) {
+ *     			return false;
+ *     		}
+ *
+ *     		double sum = 0.0;
+ *     		for (Event e: ctx.getEventsForPattern("middle")) {
+ *     			sum += e.getPrice();
+ *     		}
+ *     		sum += value.getPrice();
+ *     		return Double.compare(sum, 5.0) <= 0;
+ *     	}
+ *    }
+ * }
+ * 
+ * + * ATTENTION: The call to {@link Context#getEventsForPattern(String) getEventsForPattern(...)} has to find + * the elements that belong to the pattern among the elements stored by the NFA. The cost of this operation can vary, + * so when implementing your condition, try to minimize the times the method is called. + */ +public abstract class IterativeCondition implements Function, Serializable { + + private static final long serialVersionUID = 7067817235759351255L; + + /** + * The filter function that evaluates the predicate. + *

+ * IMPORTANT: The system assumes that the function does not + * modify the elements on which the predicate is applied. Violating this assumption + * can lead to incorrect results. + * + * @param value The value to be tested. + * @param ctx The {@link Context} used for the evaluation of the function and provides access to + * the already accepted events in the pattern (see {@link Context#getEventsForPattern(String)}). + * @return {@code true} for values that should be retained, {@code false} + * for values to be filtered out. + * + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. + */ + public abstract boolean filter(T value, Context ctx) throws Exception; + + /** + * The context used when evaluating the {@link IterativeCondition condition}. + */ + public interface Context extends Serializable { + + /** + * @return An {@link Iterable} over the already accepted elements + * for a given pattern. Elements are iterated in the order the were + * inserted in the pattern. + * + * @param name The name of the pattern. + */ + Iterable getEventsForPattern(String name); + } +} diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java similarity index 67% rename from flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java rename to flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java index a4fc8f54c1052..3e6ab56d57ce0 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java @@ -16,27 +16,25 @@ * limitations under the License. */ -package org.apache.flink.cep.pattern; - -import org.apache.flink.api.common.functions.FilterFunction; +package org.apache.flink.cep.pattern.conditions; /** - * A filter function which negates filter function. + * A {@link IterativeCondition condition} which negates the condition it wraps + * and returns {@code true} if the original condition returns {@code false}. * * @param Type of the element to filter */ -public class NotFilterFunction implements FilterFunction { +public class NotCondition extends IterativeCondition { private static final long serialVersionUID = -2109562093871155005L; - private final FilterFunction original; + private final IterativeCondition original; - public NotFilterFunction(final FilterFunction original) { + public NotCondition(final IterativeCondition original) { this.original = original; } @Override - public boolean filter(T value) throws Exception { - return !original.filter(value); + public boolean filter(T value, Context ctx) throws Exception { + return !original.filter(value, ctx); } - } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java new file mode 100644 index 0000000000000..6aaa4bbb15802 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +/** + * A {@link IterativeCondition condition} which combines two conditions with a logical + * {@code OR} and returns {@code true} if at least one is {@code true}. + * + * @param Type of the element to filter + */ +public class OrCondition extends IterativeCondition { + + private static final long serialVersionUID = 2554610954278485106L; + + private final IterativeCondition left; + private final IterativeCondition right; + + public OrCondition(final IterativeCondition left, final IterativeCondition right) { + this.left = left; + this.right = right; + } + + @Override + public boolean filter(T value, Context ctx) throws Exception { + return left.filter(value, ctx) || right.filter(value, ctx); + } + + /** + * @return One of the {@link IterativeCondition conditions} combined in this condition. + */ + public IterativeCondition getLeft() { + return left; + } + + /** + * @return One of the {@link IterativeCondition conditions} combined in this condition. + */ + public IterativeCondition getRight() { + return right; + } +} diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java new file mode 100644 index 0000000000000..9ca52c5c8d88a --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +import org.apache.flink.api.common.functions.FilterFunction; + +/** + * A user-defined condition that decides if an element should be accepted in the pattern or not. + * Accepting an element also signals a state transition for the corresponding {@link org.apache.flink.cep.nfa.NFA}. + * + *

Contrary to the {@link IterativeCondition}, conditions that extend this class do not have access to the + * previously accepted elements in the pattern. Conditions that extend this class are simple {@code filter(...)} + * functions that decide based on the properties of the element at hand. + */ +public abstract class SimpleCondition extends IterativeCondition implements FilterFunction { + + private static final long serialVersionUID = 4942618239408140245L; + + @Override + public boolean filter(T value, Context ctx) throws Exception { + return filter(value); + } +} diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java new file mode 100644 index 0000000000000..91f6c21ffb55c --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +/** + * A {@link IterativeCondition condition} which filters elements of the given type. + * An element is filtered out iff it is not assignable to the given subtype of {@code T}. + * + * @param Type of the elements to be filtered + */ +public class SubtypeCondition extends SimpleCondition { + private static final long serialVersionUID = -2990017519957561355L; + + /** The subtype to filter for. */ + private final Class subtype; + + public SubtypeCondition(final Class subtype) { + this.subtype = subtype; + } + + @Override + public boolean filter(T value) throws Exception { + return subtype.isAssignableFrom(value.getClass()); + } +} diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java index 58870174195d1..42117ee838538 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java @@ -18,11 +18,11 @@ package org.apache.flink.cep; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; @@ -81,7 +81,7 @@ public void testSimplePatternCEP() throws Exception { new Event(8, "end", 1.0) ); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { @@ -89,7 +89,7 @@ public boolean filter(Event value) throws Exception { } }) .followedBy("middle").subtype(SubEvent.class).where( - new FilterFunction() { + new SimpleCondition() { @Override public boolean filter(SubEvent value) throws Exception { @@ -97,7 +97,7 @@ public boolean filter(SubEvent value) throws Exception { } } ) - .followedBy("end").where(new FilterFunction() { + .followedBy("end").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { @@ -156,7 +156,7 @@ public Integer getKey(Event value) throws Exception { } }); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { @@ -164,7 +164,7 @@ public boolean filter(Event value) throws Exception { } }) .followedBy("middle").subtype(SubEvent.class).where( - new FilterFunction() { + new SimpleCondition() { @Override public boolean filter(SubEvent value) throws Exception { @@ -172,7 +172,7 @@ public boolean filter(SubEvent value) throws Exception { } } ) - .followedBy("end").where(new FilterFunction() { + .followedBy("end").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { @@ -236,19 +236,19 @@ public Event map(Tuple2 value) throws Exception { } }); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } - }).followedBy("end").where(new FilterFunction() { + }).followedBy("end").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { @@ -325,19 +325,19 @@ public Integer getKey(Event value) throws Exception { } }); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } - }).followedBy("end").where(new FilterFunction() { + }).followedBy("end").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { @@ -378,7 +378,7 @@ public void testSimplePatternWithSingleState() throws Exception { Pattern, ?> pattern = Pattern.>begin("start") - .where(new FilterFunction>() { + .where(new SimpleCondition>() { @Override public boolean filter(Tuple2 rec) throws Exception { return rec.f1 == 1; @@ -456,19 +456,19 @@ public Event map(Tuple2 value) throws Exception { } }); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } - }).followedBy("end").where(new FilterFunction() { + }).followedBy("end").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { @@ -524,26 +524,26 @@ public void testSimpleOrFilterPatternCEP() throws Exception { ); Pattern pattern = Pattern.begin("start") - .where(new FilterFunction() { + .where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } }) .followedBy("middle") - .where(new FilterFunction() { + .where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getPrice() == 2.0; } }) - .or(new FilterFunction() { + .or(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getPrice() == 5.0; } }) - .followedBy("end").where(new FilterFunction() { + .followedBy("end").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java index 5b05f191519b9..197767e21329e 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java @@ -18,20 +18,26 @@ package org.apache.flink.cep.nfa; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import org.apache.flink.api.common.functions.FilterFunction; +import com.google.common.primitives.Doubles; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.Event; import org.apache.flink.cep.SubEvent; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.IterativeCondition; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; +import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -39,7 +45,6 @@ import java.util.Set; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; public class NFAITCase extends TestLogger { @@ -58,23 +63,21 @@ public void testSimplePatternNFA() { inputEvents.add(new StreamRecord(new Event(43, "start", 1.0), 4)); inputEvents.add(new StreamRecord(endEvent, 5)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }) - .followedBy("middle").subtype(SubEvent.class).where(new FilterFunction() { - private static final long serialVersionUID = 6215754202506583964L; + }).followedBy("middle").subtype(SubEvent.class).where(new SimpleCondition() { + private static final long serialVersionUID = 6215754202506583964L; - @Override - public boolean filter(SubEvent value) throws Exception { - return value.getVolume() > 5.0; - } - }) - .followedBy("end").where(new FilterFunction() { + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getVolume() > 5.0; + } + }).followedBy("end").where(new SimpleCondition() { private static final long serialVersionUID = 7056763917392056548L; @Override @@ -113,14 +116,14 @@ public void testStrictContinuityWithResults() { inputEvents.add(new StreamRecord<>(middleEvent1, 3)); inputEvents.add(new StreamRecord<>(end, 5)); - Pattern pattern = Pattern.begin("middle").where(new FilterFunction() { + Pattern pattern = Pattern.begin("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).next("end").where(new FilterFunction() { + }).next("end").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -163,14 +166,14 @@ public void testStrictContinuityNoResults() { inputEvents.add(new StreamRecord<>(middleEvent2, 4)); inputEvents.add(new StreamRecord<>(end, 5)); - Pattern pattern = Pattern.begin("middle").where(new FilterFunction() { + Pattern pattern = Pattern.begin("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).next("end").where(new FilterFunction() { + }).next("end").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -217,21 +220,21 @@ public void testSimplePatternWithTimeWindowNFA() { events.add(new StreamRecord(new Event(6, "end", 1.0), 13)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 7907391379273505897L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = -3268741540234334074L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } - }).followedBy("end").where(new FilterFunction() { + }).followedBy("end").where(new SimpleCondition() { private static final long serialVersionUID = -8995174172182138608L; @Override @@ -240,11 +243,12 @@ public boolean filter(Event value) throws Exception { } }).within(Time.milliseconds(10)); - NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); for (StreamRecord event: events) { - Collection> patterns = nfa.process(event.getValue(), event.getTimestamp()).f0; + Collection> patterns = nfa.process( + event.getValue(), + event.getTimestamp()).f0; resultingPatterns.addAll(patterns); } @@ -269,7 +273,6 @@ public void testSimplePatternWithTimeoutHandling() { Set, Long>> resultingTimeoutPatterns = new HashSet<>(); Set, Long>> expectedTimeoutPatterns = new HashSet<>(); - events.add(new StreamRecord(new Event(1, "start", 1.0), 1)); events.add(new StreamRecord(new Event(2, "start", 1.0), 2)); events.add(new StreamRecord(new Event(3, "middle", 1.0), 3)); @@ -296,21 +299,21 @@ public void testSimplePatternWithTimeoutHandling() { expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern3, 11L)); expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern4, 13L)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 7907391379273505897L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = -3268741540234334074L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } - }).followedBy("end").where(new FilterFunction() { + }).followedBy("end").where(new SimpleCondition() { private static final long serialVersionUID = -8995174172182138608L; @Override @@ -319,7 +322,6 @@ public boolean filter(Event value) throws Exception { } }).within(Time.milliseconds(10)); - NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), true); for (StreamRecord event: events) { @@ -359,38 +361,35 @@ public void testBranchingPattern() { inputEvents.add(new StreamRecord(nextOne2, 7)); inputEvents.add(new StreamRecord(endEvent, 8)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }) - .followedBy("middle-first").subtype(SubEvent.class).where(new FilterFunction() { - private static final long serialVersionUID = 6215754202506583964L; + }).followedBy("middle-first").subtype(SubEvent.class).where(new SimpleCondition() { + private static final long serialVersionUID = 6215754202506583964L; - @Override - public boolean filter(SubEvent value) throws Exception { - return value.getVolume() > 5.0; - } - }) - .followedBy("middle-second").subtype(SubEvent.class).where(new FilterFunction() { - private static final long serialVersionUID = 6215754202506583964L; + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getVolume() > 5.0; + } + }).followedBy("middle-second").subtype(SubEvent.class).where(new SimpleCondition() { + private static final long serialVersionUID = 6215754202506583964L; - @Override - public boolean filter(SubEvent value) throws Exception { - return value.getName().equals("next-one"); - } - }) - .followedBy("end").where(new FilterFunction() { - private static final long serialVersionUID = 7056763917392056548L; + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getName().equals("next-one"); + } + }).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 7056763917392056548L; - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("end"); - } - }); + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); @@ -443,44 +442,42 @@ public void testComplexBranchingAfterZeroOrMore() { inputEvents.add(new StreamRecord<>(end3, 8)); inputEvents.add(new StreamRecord<>(end4, 9)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore(false).followedBy("end1").where(new FilterFunction() { + }).zeroOrMore(false).followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } - }) - .followedBy("end2").where(new FilterFunction() { - private static final long serialVersionUID = 5726188262756267490L; + }).followedBy("end2").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("d"); - } - }) - .followedBy("end3").where(new FilterFunction() { - private static final long serialVersionUID = 5726188262756267490L; + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }).followedBy("end3").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("e"); - } - }); + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("e"); + } + }); NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); @@ -533,21 +530,21 @@ public void testZeroOrMore() { inputEvents.add(new StreamRecord<>(middleEvent2, 4)); inputEvents.add(new StreamRecord<>(end1, 6)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore(false).followedBy("end1").where(new FilterFunction() { + }).zeroOrMore(false).followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -597,21 +594,21 @@ public void testEagerZeroOrMore() { inputEvents.add(new StreamRecord<>(middleEvent3, 5)); inputEvents.add(new StreamRecord<>(end1, 6)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore(true).followedBy("end1").where(new FilterFunction() { + }).zeroOrMore(true).followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -660,14 +657,14 @@ public void testBeginWithZeroOrMore() { inputEvents.add(new StreamRecord<>(middleEvent3, 5)); inputEvents.add(new StreamRecord<>(end, 6)); - Pattern pattern = Pattern.begin("middle").where(new FilterFunction() { + Pattern pattern = Pattern.begin("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore().followedBy("end").where(new FilterFunction() { + }).zeroOrMore().followedBy("end").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -720,28 +717,28 @@ public void testZeroOrMoreAfterZeroOrMore() { inputEvents.add(new StreamRecord<>(middleEvent3, 5)); inputEvents.add(new StreamRecord<>(end, 6)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle-first").where(new FilterFunction() { + }).followedBy("middle-first").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore(false).followedBy("middle-second").where(new FilterFunction() { + }).zeroOrMore(false).followedBy("middle-second").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("d"); } - }).zeroOrMore(false).followedBy("end").where(new FilterFunction() { + }).zeroOrMore(false).followedBy("end").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -799,35 +796,35 @@ public void testZeroOrMoreAfterBranching() { inputEvents.add(new StreamRecord<>(kleene2, 7)); inputEvents.add(new StreamRecord<>(end, 8)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("branching").where(new FilterFunction() { + }).followedBy("branching").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).followedBy("merging").where(new FilterFunction() { + }).followedBy("merging").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("f"); } - }).followedBy("kleene").where(new FilterFunction() { + }).followedBy("kleene").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("d"); } - }).zeroOrMore(false).followedBy("end").where(new FilterFunction() { + }).zeroOrMore(false).followedBy("end").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -881,14 +878,14 @@ public void testStrictContinuityNoResultsAfterZeroOrMore() { inputEvents.add(new StreamRecord<>(middleEvent3, 4)); inputEvents.add(new StreamRecord<>(end, 5)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("d"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -896,7 +893,7 @@ public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).zeroOrMore() - .next("end").where(new FilterFunction() { + .next("end").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -909,7 +906,6 @@ public boolean filter(Event value) throws Exception { Set> resultingPatterns = new HashSet<>(); - for (StreamRecord inputEvent : inputEvents) { Collection> patterns = nfa.process( inputEvent.getValue(), @@ -937,29 +933,28 @@ public void testStrictContinuityResultsAfterZeroOrMore() { inputEvents.add(new StreamRecord<>(middleEvent2, 3)); inputEvents.add(new StreamRecord<>(end, 5)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("d"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore(false) - .next("end").where(new FilterFunction() { - private static final long serialVersionUID = 5726188262756267490L; + }).zeroOrMore(false).next("end").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("b"); - } - }); + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); @@ -998,21 +993,21 @@ public void testAtLeastOne() { inputEvents.add(new StreamRecord<>(middleEvent2, 4)); inputEvents.add(new StreamRecord<>(end1, 6)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).oneOrMore(false).followedBy("end1").where(new FilterFunction() { + }).oneOrMore(false).followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1059,14 +1054,14 @@ public void testBeginWithAtLeastOne() { inputEvents.add(new StreamRecord<>(startEvent3, 5)); inputEvents.add(new StreamRecord<>(end1, 6)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).oneOrMore(false).followedBy("end").where(new FilterFunction() { + }).oneOrMore(false).followedBy("end").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1120,21 +1115,21 @@ public void testNextZeroOrMore() { inputEvents.add(new StreamRecord<>(middleEvent3, 5L)); inputEvents.add(new StreamRecord<>(endEvent, 6L)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 6215754202506583964L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }).next("middle").where(new FilterFunction() { + }).next("middle").where(new SimpleCondition() { private static final long serialVersionUID = 6215754202506583964L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } - }).zeroOrMore(false).followedBy("end").where(new FilterFunction() { + }).zeroOrMore(false).followedBy("end").where(new SimpleCondition() { private static final long serialVersionUID = 7056763917392056548L; @Override @@ -1181,21 +1176,21 @@ public void testAtLeastOneEager() { inputEvents.add(new StreamRecord<>(middleEvent3, 5)); inputEvents.add(new StreamRecord<>(end1, 6)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).oneOrMore(true).followedBy("end1").where(new FilterFunction() { + }).oneOrMore(true).followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1240,21 +1235,21 @@ public void testOptional() { inputEvents.add(new StreamRecord<>(middleEvent, 5)); inputEvents.add(new StreamRecord<>(end1, 6)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).optional().followedBy("end1").where(new FilterFunction() { + }).optional().followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1303,21 +1298,21 @@ public void testTimes() { inputEvents.add(new StreamRecord<>(middleEvent3, 4)); inputEvents.add(new StreamRecord<>(end1, 6)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).next("middle").where(new FilterFunction() { + }).next("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).times(2).followedBy("end1").where(new FilterFunction() { + }).times(2).followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1362,14 +1357,14 @@ public void testStartWithTimes() { inputEvents.add(new StreamRecord<>(middleEvent3, 4)); inputEvents.add(new StreamRecord<>(end1, 6)); - Pattern pattern = Pattern.begin("middle").where(new FilterFunction() { + Pattern pattern = Pattern.begin("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).times(2).followedBy("end1").where(new FilterFunction() { + }).times(2).followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1411,14 +1406,14 @@ public void testStartWithOptional() { inputEvents.add(new StreamRecord<>(startEvent, 1)); inputEvents.add(new StreamRecord<>(end1, 6)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).optional().followedBy("end1").where(new FilterFunction() { + }).optional().followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1464,14 +1459,14 @@ public void testEndWithZeroOrMore() { inputEvents.add(new StreamRecord<>(middleEvent2, 4)); inputEvents.add(new StreamRecord<>(middleEvent3, 5)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1525,7 +1520,7 @@ public void testStartAndEndWithZeroOrMore() { inputEvents.add(new StreamRecord<>(end2, 6)); inputEvents.add(new StreamRecord<>(end3, 6)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1571,14 +1566,14 @@ public void testEndWithOptional() { inputEvents.add(new StreamRecord<>(startEvent, 1)); inputEvents.add(new StreamRecord<>(middleEvent1, 3)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1624,14 +1619,14 @@ public void testEndWithOneOrMore() { inputEvents.add(new StreamRecord<>(middleEvent2, 4)); inputEvents.add(new StreamRecord<>(middleEvent3, 5)); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1676,21 +1671,21 @@ public void testTimesClearingBuffer() { Event middleEvent3 = new Event(43, "a", 4.0); Event end1 = new Event(44, "b", 5.0); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).next("middle").where(new FilterFunction() { + }).next("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).times(2).followedBy("end1").where(new FilterFunction() { + }).times(2).followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1719,21 +1714,21 @@ public void testOptionalClearingBuffer() { Event middleEvent = new Event(43, "a", 4.0); Event end1 = new Event(44, "b", 5.0); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).optional().followedBy("end1").where(new FilterFunction() { + }).optional().followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1761,21 +1756,21 @@ public void testAtLeastOneClearingBuffer() { Event middleEvent2 = new Event(42, "a", 3.0); Event end1 = new Event(44, "b", 5.0); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).oneOrMore(false).followedBy("end1").where(new FilterFunction() { + }).oneOrMore(false).followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1805,21 +1800,21 @@ public void testZeroOrMoreClearingBuffer() { Event middleEvent2 = new Event(42, "a", 3.0); Event end1 = new Event(44, "b", 5.0); - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } - }).followedBy("middle").where(new FilterFunction() { + }).followedBy("middle").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore(false).followedBy("end1").where(new FilterFunction() { + }).zeroOrMore(false).followedBy("end1").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1841,4 +1836,459 @@ public boolean filter(Event value) throws Exception { assertEquals(true, nfa.isEmpty()); } + + ////////////////////// Iterative BooleanConditions ///////////////////////// + + private final Event startEvent1 = new Event(40, "start", 1.0); + private final Event startEvent2 = new Event(40, "start", 2.0); + private final Event startEvent3 = new Event(40, "start", 3.0); + private final Event startEvent4 = new Event(40, "start", 4.0); + private final SubEvent middleEvent1 = new SubEvent(41, "foo1", 1.0, 10); + private final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10); + private final SubEvent middleEvent3 = new SubEvent(43, "foo3", 3.0, 10); + private final SubEvent middleEvent4 = new SubEvent(43, "foo4", 1.0, 10); + private final Event nextOne = new Event(44, "next-one", 1.0); + private final Event endEvent = new Event(46, "end", 1.0); + + @Test + public void testIterativeWithBranchingPatternEager() { + List> actual = testIterativeWithBranchingPattern(true); + + compareMaps(actual, + Lists.>newArrayList( + Lists.newArrayList(startEvent1, endEvent, middleEvent1, middleEvent2, middleEvent4), + Lists.newArrayList(startEvent1, endEvent, middleEvent2, middleEvent1), + Lists.newArrayList(startEvent1, endEvent, middleEvent1), + Lists.newArrayList(startEvent2, endEvent, middleEvent3, middleEvent4), + Lists.newArrayList(startEvent2, endEvent, middleEvent3) + ) + ); + } + + @Test + public void testIterativeWithBranchingPatternCombinations() { + List> actual = testIterativeWithBranchingPattern(false); + + compareMaps(actual, + Lists.>newArrayList( + Lists.newArrayList(startEvent1, endEvent, middleEvent1, middleEvent2, middleEvent4), + Lists.newArrayList(startEvent1, endEvent, middleEvent2, middleEvent1), + Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent3), + Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent2), + Lists.newArrayList(startEvent1, endEvent, middleEvent3, middleEvent1), + Lists.newArrayList(startEvent2, endEvent, middleEvent3, middleEvent4), + Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent1), + Lists.newArrayList(startEvent1, endEvent, middleEvent4), + Lists.newArrayList(startEvent1, endEvent, middleEvent1), + Lists.newArrayList(startEvent1, endEvent, middleEvent2), + Lists.newArrayList(startEvent1, endEvent, middleEvent3), + Lists.newArrayList(startEvent2, endEvent, middleEvent4), + Lists.newArrayList(startEvent2, endEvent, middleEvent3) + ) + ); + } + + private List> testIterativeWithBranchingPattern(boolean eager) { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(startEvent1, 1)); + inputEvents.add(new StreamRecord(middleEvent1, 2)); + inputEvents.add(new StreamRecord(middleEvent2, 3)); + inputEvents.add(new StreamRecord<>(startEvent2, 4)); + inputEvents.add(new StreamRecord(middleEvent3, 5)); + inputEvents.add(new StreamRecord(middleEvent4, 5)); + inputEvents.add(new StreamRecord<>(nextOne, 6)); + inputEvents.add(new StreamRecord<>(endEvent, 8)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }).followedBy("middle").subtype(SubEvent.class).where(new IterativeCondition() { + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(SubEvent value, Context ctx) throws Exception { + if (!value.getName().startsWith("foo")) { + return false; + } + + double sum = 0.0; + for (Event event : ctx.getEventsForPattern("middle")) { + sum += event.getPrice(); + } + sum += value.getPrice(); + return Double.compare(sum, 5.0) < 0; + } + }).oneOrMore(eager).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 7056763917392056548L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = new ArrayList<>(); + + for (StreamRecord inputEvent : inputEvents) { + Collection> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map p: patterns) { + resultingPatterns.add(new ArrayList<>(p.values())); + } + } + + return resultingPatterns; + } + + @Test + public void testIterativeWithLoopingStartingEager() { + List> actual = testIterativeWithLoopingStarting(true); + + compareMaps(actual, + Lists.>newArrayList( + Lists.newArrayList(startEvent1, startEvent2, endEvent), + Lists.newArrayList(startEvent1, endEvent), + Lists.newArrayList(startEvent2, endEvent), + Lists.newArrayList(startEvent3, endEvent), + Lists.newArrayList(endEvent) + ) + ); + } + + @Test + public void testIterativeWithLoopingStartingCombination() { + List> actual = testIterativeWithLoopingStarting(false); + + compareMaps(actual, + Lists.>newArrayList( + Lists.newArrayList(startEvent1, startEvent2, endEvent), + Lists.newArrayList(startEvent1, startEvent3, endEvent), + Lists.newArrayList(startEvent1, endEvent), + Lists.newArrayList(startEvent2, endEvent), + Lists.newArrayList(startEvent3, endEvent), + Lists.newArrayList(endEvent) + ) + ); + } + + private List> testIterativeWithLoopingStarting(boolean eager) { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(startEvent1, 1L)); + inputEvents.add(new StreamRecord<>(startEvent2, 2L)); + inputEvents.add(new StreamRecord<>(startEvent3, 3L)); + inputEvents.add(new StreamRecord<>(endEvent, 4L)); + + // for now, a pattern inherits its continuity property from the followedBy() or next(), and the default + // behavior (which is the one applied in the case that the pattern graph starts with such a pattern) + // of a looping pattern is with relaxed continuity (as in followedBy). + + Pattern pattern = Pattern.begin("start").where(new IterativeCondition() { + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + if (!value.getName().equals("start")) { + return false; + } + + double sum = 0.0; + for (Event event : ctx.getEventsForPattern("start")) { + sum += event.getPrice(); + } + sum += value.getPrice(); + return Double.compare(sum, 5.0) < 0; + } + }).zeroOrMore(eager).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 7056763917392056548L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = new ArrayList<>(); + + for (StreamRecord inputEvent : inputEvents) { + Collection> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map p: patterns) { + resultingPatterns.add(new ArrayList<>(p.values())); + } + } + + return resultingPatterns; + } + + @Test + public void testIterativeWithPrevPatternDependency() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(startEvent1, 1L)); + inputEvents.add(new StreamRecord<>(startEvent2, 2L)); + inputEvents.add(new StreamRecord<>(endEvent, 4L)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }).oneOrMore().followedBy("end").where(new IterativeCondition() { + private static final long serialVersionUID = 7056763917392056548L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + if (!value.getName().equals("end")) { + return false; + } + + double sum = 0.0; + for (Event event : ctx.getEventsForPattern("start")) { + sum += event.getPrice(); + } + return Double.compare(sum, 2.0) >= 0; + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = new ArrayList<>(); + + for (StreamRecord inputEvent : inputEvents) { + Collection> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map p: patterns) { + resultingPatterns.add(new ArrayList<>(p.values())); + } + } + + compareMaps(resultingPatterns, + Lists.>newArrayList( + Lists.newArrayList(startEvent1, startEvent2, endEvent), + Lists.newArrayList(startEvent2, endEvent) + ) + ); + } + + @Test + public void testIterativeWithABACPattern() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(startEvent1, 1L)); //1 + inputEvents.add(new StreamRecord(middleEvent1, 2L)); //1 + + inputEvents.add(new StreamRecord<>(startEvent2, 2L)); //2 + inputEvents.add(new StreamRecord<>(startEvent3, 2L)); //3 + inputEvents.add(new StreamRecord(middleEvent2, 2L)); //2 + + inputEvents.add(new StreamRecord<>(startEvent4, 2L)); //4 + inputEvents.add(new StreamRecord(middleEvent3, 2L)); //3 + inputEvents.add(new StreamRecord(middleEvent4, 2L)); //1 + inputEvents.add(new StreamRecord<>(endEvent, 4L)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }).followedBy("middle1").subtype(SubEvent.class).where(new SimpleCondition() { + private static final long serialVersionUID = 2178338526904474690L; + + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getName().startsWith("foo"); + } + }).followedBy("middle2").where(new IterativeCondition() { + private static final long serialVersionUID = -1223388426808292695L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + if (!value.getName().equals("start")) { + return false; + } + + double sum = 0.0; + for (Event e: ctx.getEventsForPattern("middle2")) { + sum += e.getPrice(); + } + sum += value.getPrice(); + return Double.compare(sum, 5.0) <= 0; + } + }).oneOrMore().followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 562590474115118323L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = new ArrayList<>(); + + for (StreamRecord inputEvent : inputEvents) { + Collection> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map p: patterns) { + resultingPatterns.add(new ArrayList<>(p.values())); + } + } + + compareMaps(resultingPatterns, + Lists.>newArrayList( + Lists.newArrayList(startEvent1, startEvent2, startEvent3, middleEvent1, endEvent), + Lists.newArrayList(startEvent1, middleEvent1, startEvent2, endEvent), + Lists.newArrayList(startEvent1, middleEvent2, startEvent4, endEvent), + Lists.newArrayList(startEvent2, middleEvent2, startEvent4, endEvent), + Lists.newArrayList(startEvent3, middleEvent2, startEvent4, endEvent) + ) + ); + } + + @Test + public void testIterativeWithPrevPatternDependencyAfterBranching() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(startEvent1, 1L)); + inputEvents.add(new StreamRecord<>(startEvent2, 2L)); + inputEvents.add(new StreamRecord(middleEvent1, 4L)); + inputEvents.add(new StreamRecord<>(startEvent3, 5L)); + inputEvents.add(new StreamRecord(middleEvent2, 6L)); + inputEvents.add(new StreamRecord<>(endEvent, 7L)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }).oneOrMore().followedBy("middle1").subtype(SubEvent.class).where(new SimpleCondition() { + private static final long serialVersionUID = 2178338526904474690L; + + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getName().startsWith("foo"); + } + }).followedBy("end").where(new IterativeCondition() { + private static final long serialVersionUID = 7056763917392056548L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + if (!value.getName().equals("end")) { + return false; + } + + double sum = 0.0; + for (Event event : ctx.getEventsForPattern("start")) { + sum += event.getPrice(); + } + return Double.compare(sum, 2.0) >= 0; + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = new ArrayList<>(); + + for (StreamRecord inputEvent : inputEvents) { + Collection> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map p: patterns) { + resultingPatterns.add(new ArrayList<>(p.values())); + } + } + + compareMaps(resultingPatterns, + Lists.>newArrayList( + Lists.newArrayList(startEvent1, startEvent2, middleEvent1, endEvent), + Lists.newArrayList(startEvent2, middleEvent1, endEvent), + Lists.newArrayList(startEvent1, startEvent2, middleEvent2, endEvent), + Lists.newArrayList(startEvent1, startEvent2, startEvent3, middleEvent2, endEvent), + Lists.newArrayList(startEvent2, startEvent3, middleEvent2, endEvent), + Lists.newArrayList(startEvent2, middleEvent2, endEvent), + Lists.newArrayList(startEvent3, middleEvent2, endEvent) + ) + ); + } + + private void compareMaps(List> actual, List> expected) { + Assert.assertEquals(expected.size(), actual.size()); + + for (List p: actual) { + Collections.sort(p, new EventComparator()); + } + + for (List p: expected) { + Collections.sort(p, new EventComparator()); + } + + Collections.sort(actual, new ListEventComparator()); + Collections.sort(expected, new ListEventComparator()); + Assert.assertArrayEquals(expected.toArray(), actual.toArray()); + } + + private class ListEventComparator implements Comparator> { + + @Override + public int compare(List o1, List o2) { + int sizeComp = Integer.compare(o1.size(), o2.size()); + if (sizeComp == 0) { + EventComparator comp = new EventComparator(); + for (int i = 0; i < o1.size(); i++) { + int eventComp = comp.compare(o1.get(i), o2.get(i)); + if (eventComp != 0) { + return eventComp; + } + } + return 0; + } else { + return sizeComp; + } + } + } + + private class EventComparator implements Comparator { + + @Override + public int compare(Event o1, Event o2) { + int nameComp = o1.getName().compareTo(o2.getName()); + int priceComp = Doubles.compare(o1.getPrice(), o2.getPrice()); + int idComp = Integer.compare(o1.getId(), o2.getId()); + if (nameComp == 0) { + if (priceComp == 0) { + return idComp; + } else { + return priceComp; + } + } else { + return nameComp; + } + } + } } diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java index 40a0e7eca2524..d2e392b30ab28 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java @@ -19,9 +19,9 @@ package org.apache.flink.cep.nfa; import org.apache.commons.io.output.ByteArrayOutputStream; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.cep.Event; -import org.apache.flink.cep.pattern.FilterFunctions; +import org.apache.flink.cep.pattern.conditions.BooleanConditions; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -58,7 +58,7 @@ public void testSimpleNFA() { startState.addTake( endState, - new FilterFunction() { + new SimpleCondition() { private static final long serialVersionUID = -4869589195918650396L; @Override @@ -68,7 +68,7 @@ public boolean filter(Event value) throws Exception { }); endState.addTake( endingState, - new FilterFunction() { + new SimpleCondition() { private static final long serialVersionUID = 2979804163709590673L; @Override @@ -76,7 +76,7 @@ public boolean filter(Event value) throws Exception { return value.getName().equals("end"); } }); - endState.addIgnore(FilterFunctions.trueFunction()); + endState.addIgnore(BooleanConditions.trueFunction()); nfa.addState(startState); nfa.addState(endState); @@ -241,7 +241,7 @@ private NFA createStartEndNFA(long windowLength) { startState.addTake( endState, - new FilterFunction() { + new SimpleCondition() { private static final long serialVersionUID = -4869589195918650396L; @Override @@ -251,7 +251,7 @@ public boolean filter(Event value) throws Exception { }); endState.addTake( endingState, - new FilterFunction() { + new SimpleCondition() { private static final long serialVersionUID = 2979804163709590673L; @Override @@ -259,7 +259,7 @@ public boolean filter(Event value) throws Exception { return value.getName().equals("end"); } }); - endState.addIgnore(FilterFunctions.trueFunction()); + endState.addIgnore(BooleanConditions.trueFunction()); nfa.addState(startState); nfa.addState(endState); @@ -268,7 +268,7 @@ public boolean filter(Event value) throws Exception { return nfa; } - private static class NameFilter implements FilterFunction { + private static class NameFilter extends SimpleCondition { private static final long serialVersionUID = 7472112494752423802L; diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java index 25618d5407c3c..f0a25d266bda2 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java @@ -84,11 +84,16 @@ public void testSharedBuffer() { sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], timestamp, DeweyNumber.fromString("1.1.0")); Collection> patterns3 = sharedBuffer.extractPatterns("b", events[7], timestamp, DeweyNumber.fromString("1.1.0")); - sharedBuffer.remove("b", events[7], timestamp); + sharedBuffer.release("b", events[7], timestamp); Collection> patterns4 = sharedBuffer.extractPatterns("b", events[7], timestamp, DeweyNumber.fromString("1.1.0")); Collection> patterns1 = sharedBuffer.extractPatterns("b", events[5], timestamp, DeweyNumber.fromString("2.0.0")); Collection> patterns2 = sharedBuffer.extractPatterns("b", events[5], timestamp, DeweyNumber.fromString("1.0.0")); - sharedBuffer.remove("b", events[5], timestamp); + sharedBuffer.release("b", events[5], timestamp); + + assertEquals(1L, patterns3.size()); + assertEquals(0L, patterns4.size()); + assertEquals(1L, patterns1.size()); + assertEquals(1L, patterns2.size()); assertTrue(sharedBuffer.isEmpty()); assertTrue(patterns4.isEmpty()); diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java index 93d78cc3eea13..80b1bcb981583 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java @@ -20,7 +20,6 @@ import com.google.common.collect.Sets; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -32,6 +31,7 @@ import org.apache.flink.cep.nfa.StateTransitionAction; import org.apache.flink.cep.pattern.MalformedPatternException; import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.util.TestLogger; import org.junit.Rule; import org.junit.Test; @@ -48,7 +48,7 @@ public class NFACompilerTest extends TestLogger { - private static final FilterFunction startFilter = new FilterFunction() { + private static final SimpleCondition startFilter = new SimpleCondition() { private static final long serialVersionUID = 3314714776170474221L; @Override @@ -57,7 +57,7 @@ public boolean filter(Event value) throws Exception { } }; - private static final FilterFunction endFilter = new FilterFunction() { + private static final SimpleCondition endFilter = new SimpleCondition() { private static final long serialVersionUID = 3990995859716364087L; @Override @@ -91,7 +91,7 @@ public void testNFACompilerUniquePatternName() { * A filter implementation to test invalid pattern specification with * duplicate pattern names. Check {@link #testNFACompilerUniquePatternName()}. */ - private static class TestFilter implements FilterFunction { + private static class TestFilter extends SimpleCondition { private static final long serialVersionUID = -3863103355752267133L; diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java index 5a3e623776ca7..b83eb3c4f5445 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java @@ -18,7 +18,6 @@ package org.apache.flink.cep.operator; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.ByteSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; @@ -29,6 +28,7 @@ import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -239,7 +239,7 @@ public NFA createNFA() { } } - private static class StartFilter implements FilterFunction { + private static class StartFilter extends SimpleCondition { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -248,7 +248,7 @@ public boolean filter(Event value) throws Exception { } } - private static class MiddleFilter implements FilterFunction { + private static class MiddleFilter extends SimpleCondition { private static final long serialVersionUID = 6215754202506583964L; @Override @@ -257,7 +257,7 @@ public boolean filter(SubEvent value) throws Exception { } } - private static class EndFilter implements FilterFunction { + private static class EndFilter extends SimpleCondition { private static final long serialVersionUID = 7056763917392056548L; @Override diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java index 65fa7335ab15d..f230bbc429995 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java @@ -17,7 +17,6 @@ */ package org.apache.flink.cep.operator; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.functions.KeySelector; @@ -26,6 +25,7 @@ import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; @@ -448,7 +448,7 @@ public NFA createNFA() { } } - private static class StartFilter implements FilterFunction { + private static class StartFilter extends SimpleCondition { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -457,7 +457,7 @@ public boolean filter(Event value) throws Exception { } } - private static class MiddleFilter implements FilterFunction { + private static class MiddleFilter extends SimpleCondition { private static final long serialVersionUID = 6215754202506583964L; @Override @@ -466,7 +466,7 @@ public boolean filter(SubEvent value) throws Exception { } } - private static class EndFilter implements FilterFunction { + private static class EndFilter extends SimpleCondition { private static final long serialVersionUID = 7056763917392056548L; @Override diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index a99db051a9544..726c8b874fe18 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -19,7 +19,6 @@ package org.apache.flink.cep.operator; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.functions.KeySelector; @@ -30,6 +29,7 @@ import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.watermark.Watermark; @@ -434,7 +434,7 @@ public void testCEPOperatorCleanupProcessingTime() throws Exception { harness.close(); } - + private void verifyWatermark(Object outputObject, long timestamp) { assertTrue(outputObject instanceof Watermark); assertEquals(timestamp, ((Watermark) outputObject).getTimestamp()); @@ -512,7 +512,7 @@ private NFAFactory(boolean handleTimeout) { @Override public NFA createNFA() { - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -520,7 +520,7 @@ public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } }) - .followedBy("middle").subtype(SubEvent.class).where(new FilterFunction() { + .followedBy("middle").subtype(SubEvent.class).where(new SimpleCondition() { private static final long serialVersionUID = 6215754202506583964L; @Override @@ -528,7 +528,7 @@ public boolean filter(SubEvent value) throws Exception { return value.getVolume() > 5.0; } }) - .followedBy("end").where(new FilterFunction() { + .followedBy("end").where(new SimpleCondition() { private static final long serialVersionUID = 7056763917392056548L; @Override diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java index 399662aa33202..2c86648a8d76e 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java @@ -19,7 +19,6 @@ package org.apache.flink.cep.operator; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.cep.Event; @@ -27,6 +26,7 @@ import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; @@ -371,7 +371,7 @@ private NFAFactory(boolean handleTimeout) { @Override public NFA createNFA() { - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -379,7 +379,7 @@ public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } }) - .followedBy("middle").subtype(SubEvent.class).where(new FilterFunction() { + .followedBy("middle").subtype(SubEvent.class).where(new SimpleCondition() { private static final long serialVersionUID = 6215754202506583964L; @Override @@ -387,7 +387,7 @@ public boolean filter(SubEvent value) throws Exception { return value.getVolume() > 5.0; } }) - .followedBy("end").where(new FilterFunction() { + .followedBy("end").where(new SimpleCondition() { private static final long serialVersionUID = 7056763917392056548L; @Override diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java index 68b0419751fb2..e9aa7d2fc1ac2 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java @@ -18,9 +18,11 @@ package org.apache.flink.cep.pattern; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.cep.Event; import org.apache.flink.cep.SubEvent; +import org.apache.flink.cep.pattern.conditions.OrCondition; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.cep.pattern.conditions.SubtypeCondition; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -66,14 +68,14 @@ public void testNonStrictContiguity() { @Test public void testStrictContiguityWithCondition() { - Pattern pattern = Pattern.begin("start").next("next").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").next("next").where(new SimpleCondition() { private static final long serialVersionUID = -7657256242101104925L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("foobar"); } - }).next("end").where(new FilterFunction() { + }).next("end").where(new SimpleCondition() { private static final long serialVersionUID = -7597452389191504189L; @Override @@ -89,9 +91,9 @@ public boolean filter(Event value) throws Exception { assertNotNull(previous2 = previous.getPrevious()); assertNull(previous2.getPrevious()); - assertNotNull(pattern.getFilterFunction()); - assertNotNull(previous.getFilterFunction()); - assertNull(previous2.getFilterFunction()); + assertNotNull(pattern.getCondition()); + assertNotNull(previous.getCondition()); + assertNull(previous2.getCondition()); assertEquals(pattern.getName(), "end"); assertEquals(previous.getName(), "next"); @@ -109,8 +111,8 @@ public void testPatternWithSubtyping() { assertNotNull(previous2 = previous.getPrevious()); assertNull(previous2.getPrevious()); - assertNotNull(previous.getFilterFunction()); - assertTrue(previous.getFilterFunction() instanceof SubtypeFilterFunction); + assertNotNull(previous.getCondition()); + assertTrue(previous.getCondition() instanceof SubtypeCondition); assertEquals(pattern.getName(), "end"); assertEquals(previous.getName(), "subevent"); @@ -119,7 +121,7 @@ public void testPatternWithSubtyping() { @Test public void testPatternWithSubtypingAndFilter() { - Pattern pattern = Pattern.begin("start").next("subevent").subtype(SubEvent.class).where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").next("subevent").subtype(SubEvent.class).where(new SimpleCondition() { private static final long serialVersionUID = -4118591291880230304L; @Override @@ -136,7 +138,7 @@ public boolean filter(SubEvent value) throws Exception { assertNull(previous2.getPrevious()); assertTrue(pattern instanceof FollowedByPattern); - assertNotNull(previous.getFilterFunction()); + assertNotNull(previous.getCondition()); assertEquals(pattern.getName(), "end"); assertEquals(previous.getName(), "subevent"); @@ -145,21 +147,21 @@ public boolean filter(SubEvent value) throws Exception { @Test public void testPatternWithOrFilter() { - Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 3518061453394250543L; @Override public boolean filter(Event value) throws Exception { return false; } - }).or(new FilterFunction() { + }).or(new SimpleCondition() { private static final long serialVersionUID = 947463545810023841L; @Override public boolean filter(Event value) throws Exception { return false; } - }).next("or").or(new FilterFunction() { + }).next("or").or(new SimpleCondition() { private static final long serialVersionUID = -2775487887505922250L; @Override @@ -176,8 +178,8 @@ public boolean filter(Event value) throws Exception { assertNull(previous2.getPrevious()); assertTrue(pattern instanceof FollowedByPattern); - assertFalse(previous.getFilterFunction() instanceof OrFilterFunction); - assertTrue(previous2.getFilterFunction() instanceof OrFilterFunction); + assertFalse(previous.getCondition() instanceof OrCondition); + assertTrue(previous2.getCondition() instanceof OrCondition); assertEquals(pattern.getName(), "end"); assertEquals(previous.getName(), "or"); @@ -187,7 +189,9 @@ public boolean filter(Event value) throws Exception { @Test(expected = MalformedPatternException.class) public void testPatternCanHaveQuantifierSpecifiedOnce1() throws Exception { - Pattern.begin("start").where(new FilterFunction() { + Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 8876425689668531458L; + @Override public boolean filter(Object value) throws Exception { return true; @@ -198,7 +202,9 @@ public boolean filter(Object value) throws Exception { @Test(expected = MalformedPatternException.class) public void testPatternCanHaveQuantifierSpecifiedOnce2() throws Exception { - Pattern.begin("start").where(new FilterFunction() { + Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 8311890695733430258L; + @Override public boolean filter(Object value) throws Exception { return true; @@ -209,7 +215,9 @@ public boolean filter(Object value) throws Exception { @Test(expected = MalformedPatternException.class) public void testPatternCanHaveQuantifierSpecifiedOnce3() throws Exception { - Pattern.begin("start").where(new FilterFunction() { + Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 8093713196099078214L; + @Override public boolean filter(Object value) throws Exception { return true; @@ -220,7 +228,9 @@ public boolean filter(Object value) throws Exception { @Test(expected = MalformedPatternException.class) public void testPatternCanHaveQuantifierSpecifiedOnce4() throws Exception { - Pattern.begin("start").where(new FilterFunction() { + Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = -2995187062849334113L; + @Override public boolean filter(Object value) throws Exception { return true; @@ -231,7 +241,9 @@ public boolean filter(Object value) throws Exception { @Test(expected = MalformedPatternException.class) public void testPatternCanHaveQuantifierSpecifiedOnce5() throws Exception { - Pattern.begin("start").where(new FilterFunction() { + Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = -2205071036073867531L; + @Override public boolean filter(Object value) throws Exception { return true; From ad21a441434b9ac5886b664871553bf57885e984 Mon Sep 17 00:00:00 2001 From: kl0u Date: Tue, 28 Mar 2017 10:46:49 +0200 Subject: [PATCH 2/2] comments --- docs/dev/libs/cep.md | 15 +++------------ .../apache/flink/cep/scala/pattern/Pattern.scala | 15 +++++++++++++++ .../main/java/org/apache/flink/cep/nfa/NFA.java | 6 +++--- .../pattern/conditions/IterativeCondition.java | 2 +- 4 files changed, 22 insertions(+), 16 deletions(-) diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md index 9d4ca913c2f7e..932ba307584fe 100644 --- a/docs/dev/libs/cep.md +++ b/docs/dev/libs/cep.md @@ -146,11 +146,10 @@ start.where(new IterativeCondition() { return false; } - double sum = 0.0; + double sum = value.getPrice(); for (Event event : ctx.getEventsForPattern("middle")) { sum += event.getPrice(); } - sum += value.getPrice(); return Double.compare(sum, 5.0) < 0; } }); @@ -161,16 +160,8 @@ start.where(new IterativeCondition() { {% highlight scala %} start.where( (value, ctx) => { - var res = value.getName.startsWith("foo") - if (res) { - var sum = 0.0 - for (e: Event <- ctx.getEventsForPattern("middle")) { - sum += e.getPrice - } - sum += value.getPrice - res = res && sum < 5.0 - } - res + lazy val sum = ctx.getEventsForPattern("middle").asScala.map(_.getPrice).sum + value.getName.startsWith("foo") && sum + value.getPrice < 5.0 } ) {% endhighlight %} diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala index a1db460a46326..07dfc5a97d831 100644 --- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala @@ -144,6 +144,21 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) { this } + /** + * Specifies a filter condition which is ORed with an existing filter function. + * + * @param filterFun Or filter function + * @return The same pattern operator where the new filter condition is set + */ + def or(filterFun: (F, Context[F]) => Boolean): Pattern[T, F] = { + val filter = new IterativeCondition[F] { + val cleanFilter = cep.scala.cleanClosure(filterFun) + + override def filter(value: F, ctx: Context[F]): Boolean = cleanFilter(value, ctx) + } + or(filter) + } + /** * Specifies a filter condition which has to be fulfilled by an event in order to be matched. * diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index cddc1edf759a5..98c1fc9541ed9 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -92,7 +92,7 @@ public class NFA implements Serializable { private final NonDuplicatingTypeSerializer nonDuplicatingTypeSerializer; /** - * Used only for backward compatibility. Buffer used to store the matched events. + * Used only for backwards compatibility. Buffer used to store the matched events. */ private final SharedBuffer, T> sharedBuffer = null; @@ -575,7 +575,7 @@ Map> extractCurrentMatches(final ComputationState computation computationState.getVersion()); // for a given computation state, we cannot have more than one matching patterns. - Preconditions.checkArgument(paths.size() <= 1); + Preconditions.checkState(paths.size() <= 1); TypeSerializer serializer = nonDuplicatingTypeSerializer.getTypeSerializer(); @@ -609,7 +609,7 @@ private Collection> extractPatternMatches(final ComputationState< computationState.getVersion()); // for a given computation state, we cannot have more than one matching patterns. - Preconditions.checkArgument(paths.size() <= 1); + Preconditions.checkState(paths.size() <= 1); List> result = new ArrayList<>(); diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java index f225e01ae7d5f..016cdefaa1506 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java @@ -88,7 +88,7 @@ public interface Context extends Serializable { /** * @return An {@link Iterable} over the already accepted elements - * for a given pattern. Elements are iterated in the order the were + * for a given pattern. Elements are iterated in the order they were * inserted in the pattern. * * @param name The name of the pattern.