From e4b7e1383ef99b60fa00b2cdf3967df547152dc4 Mon Sep 17 00:00:00 2001 From: Dian Fu Date: Wed, 14 Jun 2017 11:19:41 +0800 Subject: [PATCH 1/6] [FLINK-6904] [cep] Support for quantifier range to CEP's pattern API --- .../flink/cep/nfa/compiler/NFACompiler.java | 29 +- .../org/apache/flink/cep/pattern/Pattern.java | 30 +- .../apache/flink/cep/pattern/Quantifier.java | 33 ++ .../org/apache/flink/cep/nfa/NFAITCase.java | 511 ++++++++++++++++++ 4 files changed, 596 insertions(+), 7 deletions(-) diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java index ce42acdfaa107..b34fe1a10ebde 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java @@ -28,6 +28,7 @@ import org.apache.flink.cep.pattern.MalformedPatternException; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.Quantifier; +import org.apache.flink.cep.pattern.Quantifier.Times; import org.apache.flink.cep.pattern.conditions.BooleanConditions; import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.cep.pattern.conditions.NotCondition; @@ -372,14 +373,34 @@ private void addStopStateToLooping(final State loopingState) { * @param times number of times the state should be copied * @return the first state of the "complex" state, next state should point to it */ - private State createTimesState(final State sinkState, int times) { + private State createTimesState(final State sinkState, Times times) { State lastSink = copyWithoutTransitiveNots(sinkState); - for (int i = 0; i < times - 1; i++) { - lastSink = createSingletonState(lastSink, getInnerIgnoreCondition(currentPattern), false); + + final IterativeCondition currentCondition = (IterativeCondition) currentPattern.getCondition(); + final IterativeCondition innerIgnoreCondition = getInnerIgnoreCondition(currentPattern); + + for (int i = times.getFrom(); i < times.getTo(); i++) { + final State optionalState = createState(currentPattern.getName(), State.StateType.Normal); + optionalState.addTake(lastSink, currentCondition); + optionalState.addProceed(sinkState, BooleanConditions.trueFunction()); + + if (innerIgnoreCondition != null) { + State ignoreState = createState(currentPattern.getName(), State.StateType.Normal); + ignoreState.addTake(lastSink, currentCondition); + ignoreState.addIgnore(innerIgnoreCondition); + optionalState.addIgnore(ignoreState, innerIgnoreCondition); + addStopStates(ignoreState); + } + + lastSink = optionalState; + addStopStates(lastSink); + } + + for (int i = 0; i < times.getFrom() - 1; i++) { + lastSink = createSingletonState(lastSink, innerIgnoreCondition, false); addStopStateToLooping(lastSink); } - final IterativeCondition currentCondition = (IterativeCondition) currentPattern.getCondition(); final IterativeCondition ignoreCondition = getIgnoreCondition(currentPattern); // we created the intermediate states in the loop, now we create the start of the loop. diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java index 2676994255a50..da5995930c0cd 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java @@ -21,6 +21,7 @@ import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy; +import org.apache.flink.cep.pattern.Quantifier.Times; import org.apache.flink.cep.pattern.conditions.AndCondition; import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.cep.pattern.conditions.OrCondition; @@ -64,7 +65,7 @@ public class Pattern { * Applicable to a {@code times} pattern, and holds * the number of times it has to appear. */ - private int times; + private Times times; protected Pattern(final String name, final Pattern previous) { this.name = name; @@ -84,7 +85,7 @@ protected Pattern( return previous; } - public int getTimes() { + public Times getTimes() { return times; } @@ -318,7 +319,30 @@ public Pattern times(int times) { checkIfQuantifierApplied(); Preconditions.checkArgument(times > 0, "You should give a positive number greater than 0."); this.quantifier = Quantifier.times(quantifier.getConsumingStrategy()); - this.times = times; + this.times = Times.of(times); + return this; + } + + /** + * Specifies that the pattern can occur between from and to times. + * + * @param from number of times matching event must appear at least + * @param to number of times matching event must appear at most + * @return The same pattern with the number of times range applied + * + * @throws MalformedPatternException if the quantifier is not applicable to this pattern. + */ + public Pattern times(int from, int to) { + checkIfNoNotPattern(); + checkIfQuantifierApplied(); + Preconditions.checkArgument(from >= 0, "The from should be a non-negative number greater than or equal to 0."); + Preconditions.checkArgument(to >= from, "The to should be a number greater than or equal to from: " + from + "."); + Preconditions.checkArgument(from != to || from != 0, "The from and to should not be both equal to 0."); + this.quantifier = Quantifier.times(quantifier.getConsumingStrategy()); + if (from == 0) { + this.quantifier.optional(); + } + this.times = Times.of(from, to); return this; } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java index efc7cf43b4dda..504fec0db0453 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java @@ -18,6 +18,8 @@ package org.apache.flink.cep.pattern; +import org.apache.flink.util.Preconditions; + import java.util.EnumSet; import java.util.Objects; @@ -143,4 +145,35 @@ public enum ConsumingStrategy { NOT_NEXT } + /** + * Describe the times this {@link Pattern} can occur. + */ + public static class Times { + private final int from; + private final int to; + + private Times(int from, int to) { + Preconditions.checkArgument(from >= 0, "The from should be a non-negative number greater than or equal to 0."); + Preconditions.checkArgument(to >= from, "The to should be a number greater than or equal to from: " + from + "."); + Preconditions.checkArgument(from != to || from != 0, "The from and to should not be both equal to 0."); + this.from = from; + this.to = to; + } + + public int getFrom() { + return from; + } + + public int getTo() { + return to; + } + + public static Times of(int from, int to) { + return new Times(from, to); + } + + public static Times of(int times) { + return new Times(times, times); + } + } } diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java index 20cb48299ec04..a4869f86fb960 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java @@ -1390,6 +1390,248 @@ public boolean filter(Event value) throws Exception { compareMaps(resultingPatterns, Lists.>newArrayList()); } + @Test + public void testTimesRange() { + List> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "a", 3.0); + Event middleEvent3 = new Event(43, "a", 4.0); + Event end1 = new Event(44, "b", 5.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 2)); + inputEvents.add(new StreamRecord<>(middleEvent2, 3)); + inputEvents.add(new StreamRecord<>(middleEvent3, 4)); + inputEvents.add(new StreamRecord<>(end1, 6)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).next("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(1, 3).allowCombinations().followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1), + Lists.newArrayList(startEvent, middleEvent1, end1) + )); + } + + @Test + public void testTimesRangeNonStrict() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(1, 3).allowCombinations().followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end) + )); + } + + @Test + public void testTimesRangeStrict() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(0, 3).consecutive().followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } + + @Test + public void testTimesRangeStrictOptional() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(1, 3).consecutive().optional().followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } + + @Test + public void testTimesRangeStrictOptional1() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).next("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(1, 3).consecutive().optional().followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } + @Test public void testStartWithOptional() { List> inputEvents = new ArrayList<>(); @@ -1905,6 +2147,275 @@ public boolean filter(Event value) throws Exception { )); } + @Test + public void testTimesRangeNonStrictOptional1() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(1, 3).optional().followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } + + @Test + public void testTimesRangeNonStrictOptional2() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2, 3).allowCombinations().optional().followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } + + @Test + public void testTimesRangeNonStrictOptional3() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2, 3).optional().followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } + + @Test + public void testTimesRangeNonStrictWithNext() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).next("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2, 3).allowCombinations().followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end) + )); + } + + @Test + public void testTimesRangeNotStrictWithFollowedByEager() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2, 3).followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end) + )); + } + + @Test + public void testTimesRangeNotStrictWithFollowedByNotEager() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2, 3).allowCombinations().followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end) + )); + } + /////////////////////////////// Consecutive //////////////////////////////////////// private static class ConsecutiveData { From 14cf664c68cfe3c091be8a5927b5f620a81e28dd Mon Sep 17 00:00:00 2001 From: Dian Fu Date: Wed, 14 Jun 2017 21:50:03 +0800 Subject: [PATCH 2/6] Move the tests to a separate file --- .../org/apache/flink/cep/nfa/NFAITCase.java | 512 ---------------- .../flink/cep/nfa/TimesRangeITCase.java | 564 ++++++++++++++++++ 2 files changed, 564 insertions(+), 512 deletions(-) create mode 100644 flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java index a4869f86fb960..506587b5b1ec3 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java @@ -1390,248 +1390,6 @@ public boolean filter(Event value) throws Exception { compareMaps(resultingPatterns, Lists.>newArrayList()); } - @Test - public void testTimesRange() { - List> inputEvents = new ArrayList<>(); - - Event startEvent = new Event(40, "c", 1.0); - Event middleEvent1 = new Event(41, "a", 2.0); - Event middleEvent2 = new Event(42, "a", 3.0); - Event middleEvent3 = new Event(43, "a", 4.0); - Event end1 = new Event(44, "b", 5.0); - - inputEvents.add(new StreamRecord<>(startEvent, 1)); - inputEvents.add(new StreamRecord<>(middleEvent1, 2)); - inputEvents.add(new StreamRecord<>(middleEvent2, 3)); - inputEvents.add(new StreamRecord<>(middleEvent3, 4)); - inputEvents.add(new StreamRecord<>(end1, 6)); - - Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("c"); - } - }).next("middle").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("a"); - } - }).times(1, 3).allowCombinations().followedBy("end1").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("b"); - } - }); - - NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - - final List> resultingPatterns = feedNFA(inputEvents, nfa); - - compareMaps(resultingPatterns, Lists.>newArrayList( - Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1), - Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1), - Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1), - Lists.newArrayList(startEvent, middleEvent1, end1) - )); - } - - @Test - public void testTimesRangeNonStrict() { - List> inputEvents = new ArrayList<>(); - - inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); - inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); - inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); - - Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("c"); - } - }).followedByAny("middle").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("a"); - } - }).times(1, 3).allowCombinations().followedBy("end1").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("b"); - } - }); - - NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - - List> resultingPatterns = feedNFA(inputEvents, nfa); - - compareMaps(resultingPatterns, Lists.>newArrayList( - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end) - )); - } - - @Test - public void testTimesRangeStrict() { - List> inputEvents = new ArrayList<>(); - - inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); - inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); - inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); - - Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("c"); - } - }).followedByAny("middle").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("a"); - } - }).times(0, 3).consecutive().followedBy("end1").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("b"); - } - }); - - NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - - List> resultingPatterns = feedNFA(inputEvents, nfa); - - compareMaps(resultingPatterns, Lists.>newArrayList( - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) - )); - } - - @Test - public void testTimesRangeStrictOptional() { - List> inputEvents = new ArrayList<>(); - - inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); - inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); - inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); - - Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("c"); - } - }).followedByAny("middle").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("a"); - } - }).times(1, 3).consecutive().optional().followedBy("end1").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("b"); - } - }); - - NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - - List> resultingPatterns = feedNFA(inputEvents, nfa); - - compareMaps(resultingPatterns, Lists.>newArrayList( - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) - )); - } - - @Test - public void testTimesRangeStrictOptional1() { - List> inputEvents = new ArrayList<>(); - - inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); - - Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("c"); - } - }).next("middle").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("a"); - } - }).times(1, 3).consecutive().optional().followedBy("end1").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("b"); - } - }); - - NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - - List> resultingPatterns = feedNFA(inputEvents, nfa); - - compareMaps(resultingPatterns, Lists.>newArrayList( - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) - )); - } - @Test public void testStartWithOptional() { List> inputEvents = new ArrayList<>(); @@ -2146,276 +1904,6 @@ public boolean filter(Event value) throws Exception { Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) )); } - - @Test - public void testTimesRangeNonStrictOptional1() { - List> inputEvents = new ArrayList<>(); - - inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); - inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); - - Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("c"); - } - }).followedBy("middle").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("a"); - } - }).times(1, 3).optional().followedBy("end1").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("b"); - } - }); - - NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - - List> resultingPatterns = feedNFA(inputEvents, nfa); - - compareMaps(resultingPatterns, Lists.>newArrayList( - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) - )); - } - - @Test - public void testTimesRangeNonStrictOptional2() { - List> inputEvents = new ArrayList<>(); - - inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); - inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); - inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); - - Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("c"); - } - }).followedByAny("middle").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("a"); - } - }).times(2, 3).allowCombinations().optional().followedBy("end1").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("b"); - } - }); - - NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - - List> resultingPatterns = feedNFA(inputEvents, nfa); - - compareMaps(resultingPatterns, Lists.>newArrayList( - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) - )); - } - - @Test - public void testTimesRangeNonStrictOptional3() { - List> inputEvents = new ArrayList<>(); - - inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); - inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); - inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); - - Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("c"); - } - }).followedByAny("middle").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("a"); - } - }).times(2, 3).optional().followedBy("end1").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("b"); - } - }); - - NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - - List> resultingPatterns = feedNFA(inputEvents, nfa); - - compareMaps(resultingPatterns, Lists.>newArrayList( - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) - )); - } - - @Test - public void testTimesRangeNonStrictWithNext() { - List> inputEvents = new ArrayList<>(); - - inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2)); - inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4)); - inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 5)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); - - Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("c"); - } - }).next("middle").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("a"); - } - }).times(2, 3).allowCombinations().followedBy("end1").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("b"); - } - }); - - NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - - List> resultingPatterns = feedNFA(inputEvents, nfa); - - compareMaps(resultingPatterns, Lists.>newArrayList( - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end) - )); - } - - @Test - public void testTimesRangeNotStrictWithFollowedByEager() { - List> inputEvents = new ArrayList<>(); - - inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); - - Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("c"); - } - }).followedBy("middle").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("a"); - } - }).times(2, 3).followedBy("end1").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("b"); - } - }); - - NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - - List> resultingPatterns = feedNFA(inputEvents, nfa); - - compareMaps(resultingPatterns, Lists.>newArrayList( - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end) - )); - } - - @Test - public void testTimesRangeNotStrictWithFollowedByNotEager() { - List> inputEvents = new ArrayList<>(); - - inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); - inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); - - Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("c"); - } - }).followedByAny("middle").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("a"); - } - }).times(2, 3).allowCombinations().followedBy("end1").where(new SimpleCondition() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("b"); - } - }); - - NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - - List> resultingPatterns = feedNFA(inputEvents, nfa); - - compareMaps(resultingPatterns, Lists.>newArrayList( - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), - Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end) - )); - } - /////////////////////////////// Consecutive //////////////////////////////////////// private static class ConsecutiveData { diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java new file mode 100644 index 0000000000000..1f9ce54bab108 --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import com.google.common.collect.Lists; +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * Tests for {@link Pattern#times(int, int)}. + */ +@SuppressWarnings("unchecked") +public class TimesRangeITCase extends TestLogger { + + @Test + public void testTimesRange() { + List> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "a", 3.0); + Event middleEvent3 = new Event(43, "a", 4.0); + Event end1 = new Event(44, "b", 5.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 2)); + inputEvents.add(new StreamRecord<>(middleEvent2, 3)); + inputEvents.add(new StreamRecord<>(middleEvent3, 4)); + inputEvents.add(new StreamRecord<>(end1, 6)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).next("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(1, 3).allowCombinations().followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1), + Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1), + Lists.newArrayList(startEvent, middleEvent1, end1) + )); + } + + @Test + public void testTimesRangeNonStrict() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(1, 3).allowCombinations().followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end) + )); + } + + @Test + public void testTimesRangeStrict() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(0, 3).consecutive().followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } + + @Test + public void testTimesRangeStrictOptional() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(1, 3).consecutive().optional().followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } + + @Test + public void testTimesRangeStrictOptional1() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).next("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(1, 3).consecutive().optional().followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } + + + @Test + public void testTimesRangeNonStrictOptional1() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(1, 3).optional().followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } + + @Test + public void testTimesRangeNonStrictOptional2() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2, 3).allowCombinations().optional().followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } + + @Test + public void testTimesRangeNonStrictOptional3() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2, 3).optional().followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } + + @Test + public void testTimesRangeNonStrictWithNext() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).next("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2, 3).allowCombinations().followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end) + )); + } + + @Test + public void testTimesRangeNotStrictWithFollowedBy() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2, 3).followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end) + )); + } + + @Test + public void testTimesRangeNotStrictWithFollowedByAny() { + List> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2, 3).allowCombinations().followedBy("end1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end) + )); + } + + private static class ConsecutiveData { + private static final Event startEvent = new Event(40, "c", 1.0); + private static final Event middleEvent1 = new Event(41, "a", 2.0); + private static final Event middleEvent2 = new Event(42, "a", 3.0); + private static final Event middleEvent3 = new Event(43, "a", 4.0); + private static final Event end = new Event(44, "b", 5.0); + + private ConsecutiveData() { + } + } +} From dc28eeb2a0f9cdbc6ad276a398837482e7989c7c Mon Sep 17 00:00:00 2001 From: Dian Fu Date: Mon, 19 Jun 2017 13:47:18 +0800 Subject: [PATCH 3/6] Refactor NFACompiler to reduce code duplication --- .../flink/cep/nfa/compiler/NFACompiler.java | 66 ++++++------------- 1 file changed, 21 insertions(+), 45 deletions(-) diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java index b34fe1a10ebde..10347be188b7e 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java @@ -264,7 +264,7 @@ private State convertPattern(final State sinkState) { } else { lastSink = createSingletonState(sinkState); } - addStopStates(lastSink); + addStopState(lastSink); return lastSink; } @@ -349,7 +349,7 @@ private State copyWithoutTransitiveNots(final State sinkState) { return copyOfSink; } - private void addStopStates(final State state) { + private void addStopState(final State state) { for (Tuple2, String> notCondition: getCurrentNotCondition()) { final State stopState = createStopState(notCondition.f0, notCondition.f1); state.addProceed(stopState, notCondition.f0); @@ -374,52 +374,22 @@ private void addStopStateToLooping(final State loopingState) { * @return the first state of the "complex" state, next state should point to it */ private State createTimesState(final State sinkState, Times times) { - State lastSink = copyWithoutTransitiveNots(sinkState); - - final IterativeCondition currentCondition = (IterativeCondition) currentPattern.getCondition(); + State lastSink = sinkState; final IterativeCondition innerIgnoreCondition = getInnerIgnoreCondition(currentPattern); - for (int i = times.getFrom(); i < times.getTo(); i++) { - final State optionalState = createState(currentPattern.getName(), State.StateType.Normal); - optionalState.addTake(lastSink, currentCondition); - optionalState.addProceed(sinkState, BooleanConditions.trueFunction()); - - if (innerIgnoreCondition != null) { - State ignoreState = createState(currentPattern.getName(), State.StateType.Normal); - ignoreState.addTake(lastSink, currentCondition); - ignoreState.addIgnore(innerIgnoreCondition); - optionalState.addIgnore(ignoreState, innerIgnoreCondition); - addStopStates(ignoreState); - } - - lastSink = optionalState; - addStopStates(lastSink); + lastSink = createSingletonState(lastSink, sinkState, innerIgnoreCondition, true); + addStopStateToLooping(lastSink); } - for (int i = 0; i < times.getFrom() - 1; i++) { - lastSink = createSingletonState(lastSink, innerIgnoreCondition, false); + lastSink = createSingletonState(lastSink, null, innerIgnoreCondition, false); addStopStateToLooping(lastSink); } - - final IterativeCondition ignoreCondition = getIgnoreCondition(currentPattern); - // we created the intermediate states in the loop, now we create the start of the loop. - if (!currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) { - return createSingletonState(lastSink, ignoreCondition, false); - } - - final State singletonState = createState(currentPattern.getName(), State.StateType.Normal); - singletonState.addTake(lastSink, currentCondition); - singletonState.addProceed(sinkState, BooleanConditions.trueFunction()); - - if (ignoreCondition != null) { - State ignoreState = createState(currentPattern.getName(), State.StateType.Normal); - ignoreState.addTake(lastSink, currentCondition); - ignoreState.addIgnore(ignoreCondition); - singletonState.addIgnore(ignoreState, ignoreCondition); - addStopStates(ignoreState); - } - return singletonState; + return createSingletonState( + lastSink, + sinkState, + getIgnoreCondition(currentPattern), + currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)); } /** @@ -433,6 +403,7 @@ private State createTimesState(final State sinkState, Times times) { @SuppressWarnings("unchecked") private State createSingletonState(final State sinkState) { return createSingletonState( + sinkState, sinkState, getIgnoreCondition(currentPattern), currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)); @@ -445,10 +416,15 @@ private State createSingletonState(final State sinkState) { * * @param ignoreCondition condition that should be applied to IGNORE transition * @param sinkState state that the state being converted should point to + * @param proceedState state that the state being converted should proceed to + * @param isOptional whether the state being converted is optional * @return the created state */ @SuppressWarnings("unchecked") - private State createSingletonState(final State sinkState, final IterativeCondition ignoreCondition, final boolean isOptional) { + private State createSingletonState(final State sinkState, + final State proceedState, + final IterativeCondition ignoreCondition, + final boolean isOptional) { final IterativeCondition currentCondition = (IterativeCondition) currentPattern.getCondition(); final IterativeCondition trueFunction = BooleanConditions.trueFunction(); @@ -459,7 +435,7 @@ private State createSingletonState(final State sinkState, final IterativeC if (isOptional) { // if no element accepted the previous nots are still valid. - singletonState.addProceed(sinkState, trueFunction); + singletonState.addProceed(proceedState, trueFunction); } if (ignoreCondition != null) { @@ -468,7 +444,7 @@ private State createSingletonState(final State sinkState, final IterativeC ignoreState = createState(currentPattern.getName(), State.StateType.Normal); ignoreState.addTake(sink, currentCondition); ignoreState.addIgnore(ignoreCondition); - addStopStates(ignoreState); + addStopState(ignoreState); } else { ignoreState = singletonState; } @@ -551,7 +527,7 @@ private State createInitOptionalStateOfZeroOrMore(final State loopingState firstStateWithoutProceed.addIgnore(ignoreFunction); firstStateWithoutProceed.addTake(loopingState, currentCondition); - addStopStates(firstStateWithoutProceed); + addStopState(firstStateWithoutProceed); } return firstState; } From f2a51e5564b05077c6eddc95f0a7033a634f60e6 Mon Sep 17 00:00:00 2001 From: Dian Fu Date: Mon, 19 Jun 2017 13:50:58 +0800 Subject: [PATCH 4/6] rename addStopState back to addStopStates --- .../org/apache/flink/cep/nfa/compiler/NFACompiler.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java index 10347be188b7e..b5a437b97fc57 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java @@ -264,7 +264,7 @@ private State convertPattern(final State sinkState) { } else { lastSink = createSingletonState(sinkState); } - addStopState(lastSink); + addStopStates(lastSink); return lastSink; } @@ -349,7 +349,7 @@ private State copyWithoutTransitiveNots(final State sinkState) { return copyOfSink; } - private void addStopState(final State state) { + private void addStopStates(final State state) { for (Tuple2, String> notCondition: getCurrentNotCondition()) { final State stopState = createStopState(notCondition.f0, notCondition.f1); state.addProceed(stopState, notCondition.f0); @@ -444,7 +444,7 @@ private State createSingletonState(final State sinkState, ignoreState = createState(currentPattern.getName(), State.StateType.Normal); ignoreState.addTake(sink, currentCondition); ignoreState.addIgnore(ignoreCondition); - addStopState(ignoreState); + addStopStates(ignoreState); } else { ignoreState = singletonState; } @@ -527,7 +527,7 @@ private State createInitOptionalStateOfZeroOrMore(final State loopingState firstStateWithoutProceed.addIgnore(ignoreFunction); firstStateWithoutProceed.addTake(loopingState, currentCondition); - addStopState(firstStateWithoutProceed); + addStopStates(firstStateWithoutProceed); } return firstState; } From 39ae8fbe75808479ac5ebc7466bcf41bfdd291c8 Mon Sep 17 00:00:00 2001 From: Dian Fu Date: Mon, 19 Jun 2017 15:14:05 +0800 Subject: [PATCH 5/6] remove redundant check --- .../src/main/java/org/apache/flink/cep/pattern/Pattern.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java index da5995930c0cd..8767a947255b6 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java @@ -335,9 +335,6 @@ public Pattern times(int times) { public Pattern times(int from, int to) { checkIfNoNotPattern(); checkIfQuantifierApplied(); - Preconditions.checkArgument(from >= 0, "The from should be a non-negative number greater than or equal to 0."); - Preconditions.checkArgument(to >= from, "The to should be a number greater than or equal to from: " + from + "."); - Preconditions.checkArgument(from != to || from != 0, "The from and to should not be both equal to 0."); this.quantifier = Quantifier.times(quantifier.getConsumingStrategy()); if (from == 0) { this.quantifier.optional(); From 1c36e1326c743efb50062716295e8e356e7b459b Mon Sep 17 00:00:00 2001 From: Dian Fu Date: Mon, 19 Jun 2017 20:04:45 +0800 Subject: [PATCH 6/6] Fix the checkstyle issues --- .../test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java index 1f9ce54bab108..4305fa2f4c595 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java @@ -18,13 +18,14 @@ package org.apache.flink.cep.nfa; -import com.google.common.collect.Lists; import org.apache.flink.cep.Event; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; import org.junit.Test; import java.util.ArrayList; @@ -281,7 +282,6 @@ public boolean filter(Event value) throws Exception { )); } - @Test public void testTimesRangeNonStrictOptional1() { List> inputEvents = new ArrayList<>();