From 76c1a4516b4bc98043d944335cc7a0aacd359278 Mon Sep 17 00:00:00 2001 From: minwenjun Date: Wed, 6 Jun 2018 00:07:55 +0800 Subject: [PATCH 1/2] Fix wrong semantic when greedy pattern is the head of the pattern --- .../flink/cep/nfa/ComputationState.java | 17 +++ .../java/org/apache/flink/cep/nfa/NFA.java | 29 ++++- .../java/org/apache/flink/cep/nfa/State.java | 15 ++- .../apache/flink/cep/nfa/StateTransition.java | 2 +- .../flink/cep/nfa/compiler/NFACompiler.java | 10 +- .../apache/flink/cep/nfa/GreedyITCase.java | 108 ++++++++++++++++++ 6 files changed, 171 insertions(+), 10 deletions(-) diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java index 88ef3d3288a7b..1685ffacc7b9d 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java @@ -91,6 +91,10 @@ public boolean isStartState() { return state.isStart() && event == null; } + public boolean isGreedyState() { + return state.isGreedy(); + } + public long getTimestamp() { return timestamp; } @@ -137,6 +141,19 @@ public int hashCode() { return Objects.hash(state, event, counter, timestamp, version, startTimestamp, previousState); } + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + + builder.append("current state: ").append(state).append("\n") + .append("previous state: ").append(previousState).append("\n") + .append("start timestamp: ").append(startTimestamp).append("\n") + .append("counter: ").append(counter).append("\n") + .append("version: ").append(version); + + return builder.toString(); + } + public static ComputationState createStartState(final NFA nfa, final State state) { Preconditions.checkArgument(state.isStart()); return new ComputationState<>(nfa, state, null, null, 0, -1L, new DeweyNumber(1), -1L); diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index 5624db9de435a..21c83740194d1 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -220,7 +220,10 @@ public Tuple2>>, Collection>>, Collection>, Long>>> process(final T event, final long timestamp, AfterMatchSkipStrategy afterMatchSkipStrategy) { + List> redundantStart = pickRedundantGreedyStartState(new ArrayList<>(computationStates)); + computationStates.removeAll(redundantStart); final int numberComputationStates = computationStates.size(); + final Collection>> result = new ArrayList<>(); final Collection>, Long>> timeoutResult = new ArrayList<>(); @@ -309,7 +312,7 @@ public Tuple2>>, Collection> computeNextStates( return resultingComputationStates; } + private List> pickRedundantGreedyStartState(List> computationStates) { + List> greedyState = new ArrayList<>(); + List> startState = new ArrayList<>(); + List> redundantStart = new ArrayList<>(); + for (ComputationState computationState : computationStates) { + if (computationState.isGreedyState()) { + greedyState.add(computationState); + } else if (computationState.isStartState()) { + startState.add(computationState); + } + } + + for(ComputationState start: startState) { + for(ComputationState greedy : greedyState) { + if (NFAStateNameHandler.getOriginalNameFromInternal(start.getState().getName()).equals( + NFAStateNameHandler.getOriginalNameFromInternal(greedy.getState().getName()))) + { + redundantStart.add(start); + } + } + } + return redundantStart; + } + private void addComputationState( List> computationStates, State currentState, diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java index 0120398b4b2a8..b2c80370d0e90 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java @@ -60,6 +60,14 @@ public boolean isStart() { return stateType == StateType.Start; } + public boolean isStop() { + return stateType == StateType.Stop; + } + + public boolean isGreedy() { + return stateType == StateType.Greedy; + } + public String getName() { return name; } @@ -131,10 +139,6 @@ public int hashCode() { return Objects.hash(name, stateType, stateTransitions); } - public boolean isStop() { - return stateType == StateType.Stop; - } - /** * Set of valid state types. */ @@ -142,6 +146,7 @@ public enum StateType { Start, // the state is a starting state for the NFA Final, // the state is a final state for the NFA Normal, // the state is neither a start nor a final state - Stop + Stop, // the state will lead to the match to failed + Greedy // the state has the greedy proerty } } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java index d624f3bb6de66..308143d6c7290 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java @@ -93,7 +93,7 @@ public String toString() { .append("StateTransition(") .append(action).append(", ") .append("from ").append(sourceState.getName()) - .append("to ").append(targetState.getName()) + .append(" to ").append(targetState.getName()) .append(condition != null ? ", with condition)" : ")") .toString(); } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java index 39e8d34acef4d..20ccb6b796688 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java @@ -573,7 +573,9 @@ private State createSingletonState(final State sinkState, return createGroupPatternState((GroupPattern) currentPattern, sinkState, proceedState, isOptional); } - final State singletonState = createState(currentPattern.getName(), State.StateType.Normal); + State.StateType type = currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY) ? State.StateType.Greedy : State.StateType.Normal; + + final State singletonState = createState(currentPattern.getName(), type); // if event is accepted then all notPatterns previous to the optional states are no longer valid final State sink = copyWithoutTransitiveNots(sinkState); singletonState.addTake(sink, takeCondition); @@ -707,7 +709,9 @@ private State createLooping(final State sinkState) { true); IterativeCondition proceedCondition = getTrueFunction(); - final State loopingState = createState(currentPattern.getName(), State.StateType.Normal); + State.StateType type = currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY) ? State.StateType.Greedy : State.StateType.Normal; + + final State loopingState = createState(currentPattern.getName(), type); if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) { if (untilCondition != null) { @@ -728,7 +732,7 @@ private State createLooping(final State sinkState) { addStopStateToLooping(loopingState); if (ignoreCondition != null) { - final State ignoreState = createState(currentPattern.getName(), State.StateType.Normal); + final State ignoreState = createState(currentPattern.getName(), type); ignoreState.addTake(loopingState, takeCondition); ignoreState.addIgnore(ignoreCondition); loopingState.addIgnore(ignoreState, ignoreCondition); diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java index 2c7f23c024f65..1c6ec4fcca085 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java @@ -904,4 +904,112 @@ public boolean filter(Event value) throws Exception { Lists.newArrayList(c, a1, a2, a3, a4, d) )); } + + @Test + public void testGreedyConsecutiveStartState() { + List> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event a3 = new Event(43, "a", 2.0); + Event d = new Event(44, "d", 3.0); + Event a4 = new Event(45, "a", 2.0); + Event a5 = new Event(46, "a", 2.0); + Event d2 = new Event(44, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(a2, 3)); + inputEvents.add(new StreamRecord<>(a3, 4)); + inputEvents.add(new StreamRecord<>(d, 5)); + inputEvents.add(new StreamRecord<>(a4, 5)); + inputEvents.add(new StreamRecord<>(a5, 5)); + inputEvents.add(new StreamRecord<>(d2, 5)); + + + // a* d + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }) + .oneOrMore() + .optional() + .consecutive() + .greedy() + .followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> resultingPatterns = feedNFA(inputEvents, nfa); + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(a1, a2, a3, d), + Lists.newArrayList(a4, a5, d2) + )); + } + + @Test + public void testGreedyReleaxStartState() { + List> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event a3 = new Event(43, "a", 2.0); + Event d = new Event(44, "d", 3.0); + Event a4 = new Event(45, "a", 2.0); + Event a5 = new Event(46, "a", 2.0); + Event d2 = new Event(47, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(a2, 3)); + inputEvents.add(new StreamRecord<>(a3, 4)); + inputEvents.add(new StreamRecord<>(d, 5)); + inputEvents.add(new StreamRecord<>(a4, 5)); + inputEvents.add(new StreamRecord<>(a5, 5)); + inputEvents.add(new StreamRecord<>(d2, 5)); + + + // a* d + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }) + .oneOrMore() + .optional() + .greedy() + .followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> resultingPatterns = feedNFA(inputEvents, nfa); + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(a1, a2, a3, d), + Lists.newArrayList(a1, a2, a3, a4, a5, d2) + )); + } + } From 24f2c527f63833fbdf1eafbd9a26c904334f216d Mon Sep 17 00:00:00 2001 From: minwenjun Date: Wed, 6 Jun 2018 06:20:09 +0800 Subject: [PATCH 2/2] checkstyle --- .../src/main/java/org/apache/flink/cep/nfa/NFA.java | 7 +++---- .../test/java/org/apache/flink/cep/nfa/GreedyITCase.java | 2 -- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index 21c83740194d1..16ea163aadfe7 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -641,11 +641,10 @@ private List> pickRedundantGreedyStartState(List start: startState) { - for(ComputationState greedy : greedyState) { + for (ComputationState start: startState) { + for (ComputationState greedy : greedyState) { if (NFAStateNameHandler.getOriginalNameFromInternal(start.getState().getName()).equals( - NFAStateNameHandler.getOriginalNameFromInternal(greedy.getState().getName()))) - { + NFAStateNameHandler.getOriginalNameFromInternal(greedy.getState().getName()))) { redundantStart.add(start); } } diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java index 1c6ec4fcca085..d7198403b8afb 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java @@ -927,7 +927,6 @@ public void testGreedyConsecutiveStartState() { inputEvents.add(new StreamRecord<>(a5, 5)); inputEvents.add(new StreamRecord<>(d2, 5)); - // a* d Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L; @@ -981,7 +980,6 @@ public void testGreedyReleaxStartState() { inputEvents.add(new StreamRecord<>(a5, 5)); inputEvents.add(new StreamRecord<>(d2, 5)); - // a* d Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { private static final long serialVersionUID = 5726188262756267490L;