From 36b9baaaffecf8be32b5e8bb5887ccbd43430caf Mon Sep 17 00:00:00 2001 From: Dian Fu Date: Tue, 27 Jun 2017 20:38:53 +0800 Subject: [PATCH 1/4] [FLINK-7008] [cep] Update NFA state only when the NFA changes --- .../flink/cep/nfa/ComputationState.java | 23 ++++++ .../java/org/apache/flink/cep/nfa/NFA.java | 28 ++++++- .../apache/flink/cep/nfa/SharedBuffer.java | 17 +++- .../AbstractKeyedCEPPatternOperator.java | 10 ++- .../org/apache/flink/cep/nfa/NFATest.java | 80 +++++++++++++++++++ 5 files changed, 150 insertions(+), 8 deletions(-) diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java index 44f8f394962c7..91005c48f6222 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; /** * Helper class which encapsulates the state of the NFA computation. It points to the current state, @@ -114,6 +115,28 @@ public DeweyNumber getVersion() { return version; } + @Override + public boolean equals(Object obj) { + if (obj instanceof ComputationState) { + ComputationState other = (ComputationState) obj; + return Objects.equals(state, other.state) && + event == other.event && + counter == other.counter && + timestamp == other.timestamp && + Objects.equals(version, other.version) && + startTimestamp == other.startTimestamp && + Objects.equals(previousState, other.previousState); + + } else { + return false; + } + } + + @Override + public int hashCode() { + return Objects.hash(state, event, counter, timestamp, version, startTimestamp, previousState); + } + public static ComputationState createStartState(final NFA nfa, final State state) { Preconditions.checkArgument(state.isStart()); return new ComputationState<>(nfa, state, null, null, 0, -1L, new DeweyNumber(1), -1L); diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index a6c5bdeba2ff1..0baff92763939 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -18,6 +18,7 @@ package org.apache.flink.cep.nfa; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; @@ -152,6 +153,12 @@ public class NFA implements Serializable { private TypeSerializer eventSerializer; + /** + * Flag indicating whether the matching status of the state machine has changed. + */ + @VisibleForTesting + boolean nfaChanged; + public NFA( final TypeSerializer eventSerializer, final long windowTime, @@ -164,6 +171,7 @@ public NFA( this.eventSharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer); this.computationStates = new LinkedList<>(); this.states = new HashSet<>(); + this.nfaChanged = false; } public Set> getStates() { @@ -195,6 +203,15 @@ public boolean isEmpty() { return eventSharedBuffer.isEmpty(); } + /** + * Check if the matching status of the NFA has changed so far. + * + * @return {@code true} if matching status has changed, {@code false} otherwise + */ + public boolean isNFAChanged() { + return nfaChanged; + } + /** * Processes the next input event. If some of the computations reach a final state then the * resulting event sequences are returned. If computations time out and timeout handling is @@ -237,8 +254,15 @@ public Tuple2>>, Collection>>, Collection>> iter = pages.entrySet().iterator(); + boolean pruned = false; while (iter.hasNext()) { SharedBufferPage page = iter.next().getValue(); - page.prune(pruningTimestamp); + if (page.prune(pruningTimestamp)) { + pruned = true; + } if (page.isEmpty()) { // delete page if it is empty iter.remove(); } } + + return pruned; } /** @@ -488,20 +494,25 @@ public SharedBufferEntry get(final ValueTimeWrapper valueTime) { * Removes all entries from the map whose timestamp is smaller than the pruning timestamp. * * @param pruningTimestamp Timestamp for the pruning + * @return {@code true} if pruning happened */ - public void prune(long pruningTimestamp) { + public boolean prune(long pruningTimestamp) { Iterator, SharedBufferEntry>> iterator = entries.entrySet().iterator(); boolean continuePruning = true; + boolean pruned = false; while (iterator.hasNext() && continuePruning) { SharedBufferEntry entry = iterator.next().getValue(); if (entry.getValueTime().getTimestamp() <= pruningTimestamp) { iterator.remove(); + pruned = true; } else { continuePruning = false; } } + + return pruned; } public boolean isEmpty() { diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index 2e3aefd0dc73b..89b5695a5be5f 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -270,10 +270,12 @@ private NFA getNFA() throws IOException { } private void updateNFA(NFA nfa) throws IOException { - if (nfa.isEmpty()) { - nfaOperatorState.clear(); - } else { - nfaOperatorState.update(nfa); + if (nfa.isNFAChanged()) { + if (nfa.isEmpty()) { + nfaOperatorState.clear(); + } else { + nfaOperatorState.update(nfa); + } } } diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java index 25863423fbc00..2a0ed671b35ab 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java @@ -26,6 +26,7 @@ import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; @@ -44,6 +45,8 @@ import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; /** * Tests for {@link NFA}. @@ -324,6 +327,83 @@ public boolean filter(Event value) throws Exception { } } + @Test + public void testNFAChange() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 1858562682635302605L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).notFollowedBy("not").where(new IterativeCondition() { + private static final long serialVersionUID = -6085237016591726715L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new IterativeCondition() { + private static final long serialVersionUID = 8061969839441121955L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + return value.getName().equals("b"); + } + }).oneOrMore().optional().allowCombinations().followedBy("middle2").where(new IterativeCondition() { + private static final long serialVersionUID = 8061969839441121955L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + return value.getName().equals("d"); + } + }).followedBy("end").where(new IterativeCondition() { + private static final long serialVersionUID = 8061969839441121955L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + return value.getName().equals("e"); + } + }).within(Time.milliseconds(10)); + + NFACompiler.NFAFactory nfaFactory = NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true); + NFA nfa = nfaFactory.createNFA(); + nfa.process(new Event(1, "b", 1.0), 1L); + assertFalse(nfa.isNFAChanged()); + + nfa.nfaChanged = false; + nfa.process(new Event(2, "a", 1.0), 2L); + assertTrue(nfa.isNFAChanged()); + + nfa.nfaChanged = false; + nfa.process(new Event(3, "f", 1.0), 3L); + assertTrue(nfa.isNFAChanged()); + + nfa.nfaChanged = false; + nfa.process(new Event(4, "f", 1.0), 4L); + assertFalse(nfa.isNFAChanged()); + + nfa.nfaChanged = false; + nfa.process(new Event(5, "b", 1.0), 5L); + assertTrue(nfa.isNFAChanged()); + + nfa.nfaChanged = false; + nfa.process(new Event(6, "d", 1.0), 6L); + assertTrue(nfa.isNFAChanged()); + + nfa.nfaChanged = false; + nfa.process(new Event(7, "f", 1.0), 7L); + assertFalse(nfa.isNFAChanged()); + + nfa.nfaChanged = false; + nfa.process(null, 8L); + assertFalse(nfa.isNFAChanged()); + + nfa.nfaChanged = false; + nfa.process(null, 12L); + assertTrue(nfa.isNFAChanged()); + } + private NFA createStartEndNFA(long windowLength) { NFA nfa = new NFA<>(Event.createTypeSerializer(), windowLength, false); From 9c179fc28a0d07b4a895bc5ccb4daa844f8e44b2 Mon Sep 17 00:00:00 2001 From: Dian Fu Date: Wed, 28 Jun 2017 13:31:36 +0800 Subject: [PATCH 2/4] Add more comments for the tests --- .../org/apache/flink/cep/nfa/NFATest.java | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java index 2a0ed671b35ab..4edcb5c009b02 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java @@ -18,6 +18,7 @@ package org.apache.flink.cep.nfa; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.Event; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; @@ -368,40 +369,46 @@ public boolean filter(Event value, Context ctx) throws Exception { NFACompiler.NFAFactory nfaFactory = NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true); NFA nfa = nfaFactory.createNFA(); + nfa.process(new Event(1, "b", 1.0), 1L); - assertFalse(nfa.isNFAChanged()); + assertFalse("NFA status should not change as the event does not match the take condition of the 'start' state", nfa.isNFAChanged()); nfa.nfaChanged = false; nfa.process(new Event(2, "a", 1.0), 2L); - assertTrue(nfa.isNFAChanged()); + assertTrue("NFA status should change as the event matches the take condition of the 'start' state", nfa.isNFAChanged()); + // the status of the queue of ComputationStatus changed, + // more than one ComputationStatus is generated by the event from some ComputationStatus nfa.nfaChanged = false; nfa.process(new Event(3, "f", 1.0), 3L); - assertTrue(nfa.isNFAChanged()); + assertTrue("NFA status should change as the event matches the ignore condition and proceed condition of the 'middle:1' state", nfa.isNFAChanged()); + // both the queue of ComputationStatus and eventSharedBuffer have not changed nfa.nfaChanged = false; nfa.process(new Event(4, "f", 1.0), 4L); - assertFalse(nfa.isNFAChanged()); + assertFalse("NFA status should not change as the event only matches the ignore condition of the 'middle:2' state and the target state is still 'middle:2'", nfa.isNFAChanged()); + // both the queue of ComputationStatus and eventSharedBuffer have changed nfa.nfaChanged = false; nfa.process(new Event(5, "b", 1.0), 5L); - assertTrue(nfa.isNFAChanged()); + assertTrue("NFA status should change as the event matches the take condition of 'middle:2' state", nfa.isNFAChanged()); + // both the queue of ComputationStatus and eventSharedBuffer have changed nfa.nfaChanged = false; nfa.process(new Event(6, "d", 1.0), 6L); - assertTrue(nfa.isNFAChanged()); - - nfa.nfaChanged = false; - nfa.process(new Event(7, "f", 1.0), 7L); - assertFalse(nfa.isNFAChanged()); + assertTrue("NFA status should change as the event matches the take condition of 'middle2' state", nfa.isNFAChanged()); + // both the queue of ComputationStatus and eventSharedBuffer have not changed + // as the timestamp is within the window nfa.nfaChanged = false; nfa.process(null, 8L); - assertFalse(nfa.isNFAChanged()); + assertFalse("NFA status should not change as the timestamp is within the window", nfa.isNFAChanged()); + // timeout ComputationStatus will be removed from the queue of ComputationStatus and timeout event will + // be removed from eventSharedBuffer as the timeout happens nfa.nfaChanged = false; - nfa.process(null, 12L); - assertTrue(nfa.isNFAChanged()); + Collection>, Long>> timeoutResults = nfa.process(null, 12L).f1; + assertTrue("NFA status should change as timeout happens", nfa.isNFAChanged() && !timeoutResults.isEmpty()); } private NFA createStartEndNFA(long windowLength) { From 8c35f0f522beb8ad1a6d29be59dacc5a6f7564e8 Mon Sep 17 00:00:00 2001 From: Dian Fu Date: Wed, 28 Jun 2017 20:16:10 +0800 Subject: [PATCH 3/4] Update according to the review comments --- .../flink/cep/nfa/ComputationState.java | 2 +- .../java/org/apache/flink/cep/nfa/NFA.java | 17 +- .../AbstractKeyedCEPPatternOperator.java | 1 + .../flink/cep/nfa/NFAStatusChangeITCase.java | 175 ++++++++++++++++++ .../org/apache/flink/cep/nfa/NFATest.java | 87 --------- .../flink/cep/operator/CEPOperatorTest.java | 123 ++++++++++++ 6 files changed, 310 insertions(+), 95 deletions(-) create mode 100644 flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java index 91005c48f6222..88ef3d3288a7b 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java @@ -120,7 +120,7 @@ public boolean equals(Object obj) { if (obj instanceof ComputationState) { ComputationState other = (ComputationState) obj; return Objects.equals(state, other.state) && - event == other.event && + Objects.equals(event, other.event) && counter == other.counter && timestamp == other.timestamp && Objects.equals(version, other.version) && diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index 0baff92763939..6e7eb9f39fcec 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -18,7 +18,8 @@ package org.apache.flink.cep.nfa; -import org.apache.flink.annotation.VisibleForTesting; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterators; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; @@ -43,11 +44,7 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterators; - import javax.annotation.Nullable; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -156,8 +153,7 @@ public class NFA implements Serializable { /** * Flag indicating whether the matching status of the state machine has changed. */ - @VisibleForTesting - boolean nfaChanged; + private boolean nfaChanged; public NFA( final TypeSerializer eventSerializer, @@ -212,6 +208,13 @@ public boolean isNFAChanged() { return nfaChanged; } + /** + * Reset {@link #nfaChanged} to {@code false}. + */ + public void resetNFAChanged() { + this.nfaChanged = false; + } + /** * Processes the next input event. If some of the computations reach a final state then the * resulting event sequences are returned. If computations time out and timeout handling is diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index 89b5695a5be5f..a9f7ecfaa5b1e 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -274,6 +274,7 @@ private void updateNFA(NFA nfa) throws IOException { if (nfa.isEmpty()) { nfaOperatorState.clear(); } else { + nfa.resetNFAChanged(); nfaOperatorState.update(nfa); } } diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java new file mode 100644 index 0000000000000..c96cc4173d7b9 --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.nfa; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.IterativeCondition; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.api.windowing.time.Time; + +import org.junit.Test; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class NFAStatusChangeITCase { + + @Test + public void testNFAChange() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 1858562682635302605L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).followedByAny("middle").where(new IterativeCondition() { + private static final long serialVersionUID = 8061969839441121955L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + return value.getName().equals("b"); + } + }).oneOrMore().optional().allowCombinations().followedBy("middle2").where(new IterativeCondition() { + private static final long serialVersionUID = 8061969839441121955L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + return value.getName().equals("d"); + } + }).followedBy("end").where(new IterativeCondition() { + private static final long serialVersionUID = 8061969839441121955L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + return value.getName().equals("e"); + } + }).within(Time.milliseconds(10)); + + NFACompiler.NFAFactory nfaFactory = NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true); + NFA nfa = nfaFactory.createNFA(); + + nfa.process(new Event(1, "b", 1.0), 1L); + assertFalse("NFA status should not change as the event does not match the take condition of the 'start' state", nfa.isNFAChanged()); + + nfa.resetNFAChanged(); + nfa.process(new Event(2, "a", 1.0), 2L); + assertTrue("NFA status should change as the event matches the take condition of the 'start' state", nfa.isNFAChanged()); + + // the status of the queue of ComputationStatus changed, + // more than one ComputationStatus is generated by the event from some ComputationStatus + nfa.resetNFAChanged(); + nfa.process(new Event(3, "f", 1.0), 3L); + assertTrue("NFA status should change as the event matches the ignore condition and proceed condition of the 'middle:1' state", nfa.isNFAChanged()); + + // both the queue of ComputationStatus and eventSharedBuffer have not changed + nfa.resetNFAChanged(); + nfa.process(new Event(4, "f", 1.0), 4L); + assertFalse("NFA status should not change as the event only matches the ignore condition of the 'middle:2' state and the target state is still 'middle:2'", nfa.isNFAChanged()); + + // both the queue of ComputationStatus and eventSharedBuffer have changed + nfa.resetNFAChanged(); + nfa.process(new Event(5, "b", 1.0), 5L); + assertTrue("NFA status should change as the event matches the take condition of 'middle:2' state", nfa.isNFAChanged()); + + // both the queue of ComputationStatus and eventSharedBuffer have changed + nfa.resetNFAChanged(); + nfa.process(new Event(6, "d", 1.0), 6L); + assertTrue("NFA status should change as the event matches the take condition of 'middle2' state", nfa.isNFAChanged()); + + // both the queue of ComputationStatus and eventSharedBuffer have not changed + // as the timestamp is within the window + nfa.resetNFAChanged(); + nfa.process(null, 8L); + assertFalse("NFA status should not change as the timestamp is within the window", nfa.isNFAChanged()); + + // timeout ComputationStatus will be removed from the queue of ComputationStatus and timeout event will + // be removed from eventSharedBuffer as the timeout happens + nfa.resetNFAChanged(); + Collection>, Long>> timeoutResults = nfa.process(null, 12L).f1; + assertTrue("NFA status should change as timeout happens", nfa.isNFAChanged() && !timeoutResults.isEmpty()); + } + + @Test + public void testNFAChangedOnOneNewComputationState() { + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }).followedBy("a*").where(new SimpleCondition() { + private static final long serialVersionUID = 1858562682635302605L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().next("end").where(new IterativeCondition() { + private static final long serialVersionUID = 8061969839441121955L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + return value.getName().equals("b"); + } + }).within(Time.milliseconds(10)); + + NFACompiler.NFAFactory nfaFactory = NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true); + NFA nfa = nfaFactory.createNFA(); + + nfa.resetNFAChanged(); + nfa.process(new Event(6, "start", 1.0), 6L); + + nfa.resetNFAChanged(); + nfa.process(new Event(6, "a", 1.0), 7L); + assertTrue(nfa.isNFAChanged()); + } + + @Test + public void testNFAChangedOnTimeoutWithoutPrune() { + Pattern pattern = Pattern.begin("start").where(new IterativeCondition() { + @Override + public boolean filter(Event value, Context ctx) throws Exception { + return value.getName().equals("start"); + } + }).followedBy("end").where(new IterativeCondition() { + private static final long serialVersionUID = 8061969839441121955L; + + @Override + public boolean filter(Event value, Context ctx) throws Exception { + return value.getName().equals("end"); + } + }).within(Time.milliseconds(10)); + + NFACompiler.NFAFactory nfaFactory = NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true); + NFA nfa = nfaFactory.createNFA(); + + nfa.resetNFAChanged(); + nfa.process(new Event(6, "start", 1.0), 6L); + + nfa.resetNFAChanged(); + nfa.process(new Event(6, "end", 1.0), 17L); + assertTrue(nfa.isNFAChanged()); + } +} diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java index 4edcb5c009b02..25863423fbc00 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java @@ -18,7 +18,6 @@ package org.apache.flink.cep.nfa; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.Event; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; @@ -27,7 +26,6 @@ import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; @@ -46,8 +44,6 @@ import java.util.Set; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; /** * Tests for {@link NFA}. @@ -328,89 +324,6 @@ public boolean filter(Event value) throws Exception { } } - @Test - public void testNFAChange() { - Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { - private static final long serialVersionUID = 1858562682635302605L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("a"); - } - }).notFollowedBy("not").where(new IterativeCondition() { - private static final long serialVersionUID = -6085237016591726715L; - - @Override - public boolean filter(Event value, Context ctx) throws Exception { - return value.getName().equals("c"); - } - }).followedByAny("middle").where(new IterativeCondition() { - private static final long serialVersionUID = 8061969839441121955L; - - @Override - public boolean filter(Event value, Context ctx) throws Exception { - return value.getName().equals("b"); - } - }).oneOrMore().optional().allowCombinations().followedBy("middle2").where(new IterativeCondition() { - private static final long serialVersionUID = 8061969839441121955L; - - @Override - public boolean filter(Event value, Context ctx) throws Exception { - return value.getName().equals("d"); - } - }).followedBy("end").where(new IterativeCondition() { - private static final long serialVersionUID = 8061969839441121955L; - - @Override - public boolean filter(Event value, Context ctx) throws Exception { - return value.getName().equals("e"); - } - }).within(Time.milliseconds(10)); - - NFACompiler.NFAFactory nfaFactory = NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true); - NFA nfa = nfaFactory.createNFA(); - - nfa.process(new Event(1, "b", 1.0), 1L); - assertFalse("NFA status should not change as the event does not match the take condition of the 'start' state", nfa.isNFAChanged()); - - nfa.nfaChanged = false; - nfa.process(new Event(2, "a", 1.0), 2L); - assertTrue("NFA status should change as the event matches the take condition of the 'start' state", nfa.isNFAChanged()); - - // the status of the queue of ComputationStatus changed, - // more than one ComputationStatus is generated by the event from some ComputationStatus - nfa.nfaChanged = false; - nfa.process(new Event(3, "f", 1.0), 3L); - assertTrue("NFA status should change as the event matches the ignore condition and proceed condition of the 'middle:1' state", nfa.isNFAChanged()); - - // both the queue of ComputationStatus and eventSharedBuffer have not changed - nfa.nfaChanged = false; - nfa.process(new Event(4, "f", 1.0), 4L); - assertFalse("NFA status should not change as the event only matches the ignore condition of the 'middle:2' state and the target state is still 'middle:2'", nfa.isNFAChanged()); - - // both the queue of ComputationStatus and eventSharedBuffer have changed - nfa.nfaChanged = false; - nfa.process(new Event(5, "b", 1.0), 5L); - assertTrue("NFA status should change as the event matches the take condition of 'middle:2' state", nfa.isNFAChanged()); - - // both the queue of ComputationStatus and eventSharedBuffer have changed - nfa.nfaChanged = false; - nfa.process(new Event(6, "d", 1.0), 6L); - assertTrue("NFA status should change as the event matches the take condition of 'middle2' state", nfa.isNFAChanged()); - - // both the queue of ComputationStatus and eventSharedBuffer have not changed - // as the timestamp is within the window - nfa.nfaChanged = false; - nfa.process(null, 8L); - assertFalse("NFA status should not change as the timestamp is within the window", nfa.isNFAChanged()); - - // timeout ComputationStatus will be removed from the queue of ComputationStatus and timeout event will - // be removed from eventSharedBuffer as the timeout happens - nfa.nfaChanged = false; - Collection>, Long>> timeoutResults = nfa.process(null, 12L).f1; - assertTrue("NFA status should change as timeout happens", nfa.isNFAChanged() && !timeoutResults.isEmpty()); - } - private NFA createStartEndNFA(long windowLength) { NFA nfa = new NFA<>(Event.createTypeSerializer(), windowLength, false); diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index d83c191bdecd4..ab14f905d83ba 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -296,6 +296,85 @@ public void testKeyedAdvancingTimeWithoutElements() throws Exception { } } + @Test + public void testKeyedCEPOperatorNFAChanged() throws Exception { + + String rocksDbPath = tempFolder.newFolder().getAbsolutePath(); + RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); + rocksDBStateBackend.setDbStoragePath(rocksDbPath); + + KeyedCEPPatternOperator operator = new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + true, + IntSerializer.INSTANCE, + new SimpleNFAFactory(), + true); + OneInputStreamOperatorTestHarness>> harness = getCepTestHarness(operator); + + try { + harness.setStateBackend(rocksDBStateBackend); + + harness.open(); + + Event startEvent = new Event(42, "c", 1.0); + SubEvent middleEvent = new SubEvent(42, "a", 1.0, 10.0); + Event endEvent = new Event(42, "b", 1.0); + + harness.processElement(new StreamRecord<>(startEvent, 1L)); + + // simulate snapshot/restore with some elements in internal sorting queue + OperatorStateHandles snapshot = harness.snapshot(0L, 0L); + harness.close(); + + operator = new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + true, + IntSerializer.INSTANCE, + new SimpleNFAFactory(), + true); + harness = getCepTestHarness(operator); + + rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); + rocksDBStateBackend.setDbStoragePath(rocksDbPath); + harness.setStateBackend(rocksDBStateBackend); + harness.setup(); + harness.initializeState(snapshot); + harness.open(); + + harness.processElement(new StreamRecord<>(new Event(42, "d", 1.0), 4L)); + OperatorStateHandles snapshot2 = harness.snapshot(0L, 0L); + harness.close(); + + operator = new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + true, + IntSerializer.INSTANCE, + new SimpleNFAFactory(), + true); + harness = getCepTestHarness(operator); + + rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); + rocksDBStateBackend.setDbStoragePath(rocksDbPath); + harness.setStateBackend(rocksDBStateBackend); + harness.setup(); + harness.initializeState(snapshot2); + harness.open(); + + harness.processElement(new StreamRecord(middleEvent, 4L)); + harness.processElement(new StreamRecord<>(endEvent, 4L)); + + // get and verify the output + + Queue result = harness.getOutput(); + + assertEquals(1, result.size()); + + verifyPattern(result.poll(), startEvent, middleEvent, endEvent); + } finally { + harness.close(); + } + } + @Test public void testCEPOperatorCleanupEventTime() throws Exception { @@ -900,4 +979,48 @@ public boolean filter(Event value) throws Exception { return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout); } } + + private static class SimpleNFAFactory implements NFACompiler.NFAFactory { + + private static final long serialVersionUID = 1173020762472766713L; + + private final boolean handleTimeout; + + private SimpleNFAFactory() { + this(false); + } + + private SimpleNFAFactory(boolean handleTimeout) { + this.handleTimeout = handleTimeout; + } + + @Override + public NFA createNFA() { + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }).within(Time.milliseconds(10L)); + + return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout); + } + } } From fa8a76a9a53af44850db447f42334d8cf31ff245 Mon Sep 17 00:00:00 2001 From: Dian Fu Date: Thu, 29 Jun 2017 09:57:59 +0800 Subject: [PATCH 4/4] Add NFA update times tests --- .../java/org/apache/flink/cep/nfa/NFA.java | 6 +- .../flink/cep/nfa/NFAStatusChangeITCase.java | 5 + .../flink/cep/operator/CEPOperatorTest.java | 158 +++++++++++++++++- 3 files changed, 166 insertions(+), 3 deletions(-) diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index 6e7eb9f39fcec..f561be46fbad8 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -18,8 +18,6 @@ package org.apache.flink.cep.nfa; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterators; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; @@ -44,7 +42,11 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterators; + import javax.annotation.Nullable; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java index c96cc4173d7b9..37ad0064bbe75 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.cep.nfa; import org.apache.flink.api.java.tuple.Tuple2; @@ -34,6 +35,10 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +/** + * Tests if the {@link NFA} status ({@link NFA#computationStates} or {@link NFA#eventSharedBuffer}) + * is changed after processing events. + */ public class NFAStatusChangeITCase { @Test diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index ab14f905d83ba..b98d2414425ba 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -19,6 +19,7 @@ package org.apache.flink.cep.operator; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.functions.KeySelector; @@ -47,6 +48,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; +import org.mockito.internal.util.reflection.Whitebox; import java.util.ArrayList; import java.util.Collections; @@ -297,7 +300,73 @@ public void testKeyedAdvancingTimeWithoutElements() throws Exception { } @Test - public void testKeyedCEPOperatorNFAChanged() throws Exception { + public void testKeyedCEPOperatorNFAUpdate() throws Exception { + KeyedCEPPatternOperator operator = new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + true, + IntSerializer.INSTANCE, + new SimpleNFAFactory(), + true); + OneInputStreamOperatorTestHarness>> harness = getCepTestHarness(operator); + + try { + harness.open(); + + Event startEvent = new Event(42, "c", 1.0); + SubEvent middleEvent = new SubEvent(42, "a", 1.0, 10.0); + Event endEvent = new Event(42, "b", 1.0); + + harness.processElement(new StreamRecord<>(startEvent, 1L)); + + // simulate snapshot/restore with some elements in internal sorting queue + OperatorStateHandles snapshot = harness.snapshot(0L, 0L); + harness.close(); + + operator = new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + true, + IntSerializer.INSTANCE, + new SimpleNFAFactory(), + true); + harness = getCepTestHarness(operator); + + harness.setup(); + harness.initializeState(snapshot); + harness.open(); + + harness.processElement(new StreamRecord<>(new Event(42, "d", 1.0), 4L)); + OperatorStateHandles snapshot2 = harness.snapshot(0L, 0L); + harness.close(); + + operator = new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + true, + IntSerializer.INSTANCE, + new SimpleNFAFactory(), + true); + harness = getCepTestHarness(operator); + + harness.setup(); + harness.initializeState(snapshot2); + harness.open(); + + harness.processElement(new StreamRecord(middleEvent, 4L)); + harness.processElement(new StreamRecord<>(endEvent, 4L)); + + // get and verify the output + + Queue result = harness.getOutput(); + + assertEquals(1, result.size()); + + verifyPattern(result.poll(), startEvent, middleEvent, endEvent); + } finally { + harness.close(); + } + } + + @Test + public void testKeyedCEPOperatorNFAUpdateWithRocksDB() throws Exception { String rocksDbPath = tempFolder.newFolder().getAbsolutePath(); RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); @@ -375,6 +444,93 @@ public void testKeyedCEPOperatorNFAChanged() throws Exception { } } + @Test + public void testKeyedCEPOperatorNFAUpdateTimes() throws Exception { + KeyedCEPPatternOperator operator = new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + true, + IntSerializer.INSTANCE, + new SimpleNFAFactory(), + true); + OneInputStreamOperatorTestHarness>> harness = getCepTestHarness(operator); + + try { + harness.open(); + + final ValueState nfaOperatorState = (ValueState) Whitebox.getInternalState(operator, "nfaOperatorState"); + final ValueState nfaOperatorStateSpy = Mockito.spy(nfaOperatorState); + Whitebox.setInternalState(operator, "nfaOperatorState", nfaOperatorStateSpy); + + Event startEvent = new Event(42, "c", 1.0); + SubEvent middleEvent = new SubEvent(42, "a", 1.0, 10.0); + Event endEvent = new Event(42, "b", 1.0); + + harness.processElement(new StreamRecord<>(startEvent, 1L)); + harness.processElement(new StreamRecord<>(new Event(42, "d", 1.0), 4L)); + harness.processElement(new StreamRecord(middleEvent, 4L)); + harness.processElement(new StreamRecord<>(endEvent, 4L)); + + // verify the number of invocations NFA is updated + Mockito.verify(nfaOperatorStateSpy, Mockito.times(3)).update(Mockito.any()); + + // get and verify the output + Queue result = harness.getOutput(); + + assertEquals(1, result.size()); + + verifyPattern(result.poll(), startEvent, middleEvent, endEvent); + } finally { + harness.close(); + } + } + + @Test + public void testKeyedCEPOperatorNFAUpdateTimesWithRocksDB() throws Exception { + + String rocksDbPath = tempFolder.newFolder().getAbsolutePath(); + RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); + rocksDBStateBackend.setDbStoragePath(rocksDbPath); + + KeyedCEPPatternOperator operator = new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + true, + IntSerializer.INSTANCE, + new SimpleNFAFactory(), + true); + OneInputStreamOperatorTestHarness>> harness = getCepTestHarness(operator); + + try { + harness.setStateBackend(rocksDBStateBackend); + + harness.open(); + + final ValueState nfaOperatorState = (ValueState) Whitebox.getInternalState(operator, "nfaOperatorState"); + final ValueState nfaOperatorStateSpy = Mockito.spy(nfaOperatorState); + Whitebox.setInternalState(operator, "nfaOperatorState", nfaOperatorStateSpy); + + Event startEvent = new Event(42, "c", 1.0); + SubEvent middleEvent = new SubEvent(42, "a", 1.0, 10.0); + Event endEvent = new Event(42, "b", 1.0); + + harness.processElement(new StreamRecord<>(startEvent, 1L)); + harness.processElement(new StreamRecord<>(new Event(42, "d", 1.0), 4L)); + harness.processElement(new StreamRecord(middleEvent, 4L)); + harness.processElement(new StreamRecord<>(endEvent, 4L)); + + // verify the number of invocations NFA is updated + Mockito.verify(nfaOperatorStateSpy, Mockito.times(3)).update(Mockito.any()); + + // get and verify the output + Queue result = harness.getOutput(); + + assertEquals(1, result.size()); + + verifyPattern(result.poll(), startEvent, middleEvent, endEvent); + } finally { + harness.close(); + } + } + @Test public void testCEPOperatorCleanupEventTime() throws Exception {