From 7f27c637bd74a8a59e0dd66f604093545cd2b452 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Tue, 5 Jul 2016 17:56:49 +0200 Subject: [PATCH 1/8] Replace StreamEvent by StreamRecord in CEP Tests --- .../org/apache/flink/cep/StreamEvent.java | 41 ------------- .../org/apache/flink/cep/nfa/NFAITCase.java | 57 ++++++++++--------- .../org/apache/flink/cep/nfa/NFATest.java | 44 +++++++------- 3 files changed, 51 insertions(+), 91 deletions(-) delete mode 100644 flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/StreamEvent.java diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/StreamEvent.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/StreamEvent.java deleted file mode 100644 index adb94461665ad..0000000000000 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/StreamEvent.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cep; - -public class StreamEvent { - private final T event; - private final long timestamp; - - public StreamEvent(T event, long timestamp) { - this.event = event; - this.timestamp = timestamp; - } - - public long getTimestamp() { - return timestamp; - } - - public T getEvent() { - return event; - } - - public static StreamEvent of(V event, long timestamp) { - return new StreamEvent<>(event, timestamp); - } -} diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java index 615219104295d..4a33c1e1371aa 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java @@ -21,11 +21,11 @@ import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.Event; -import org.apache.flink.cep.StreamEvent; import org.apache.flink.cep.SubEvent; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -43,18 +43,18 @@ public class NFAITCase extends TestLogger { @Test public void testSimplePatternNFA() { - List> inputEvents = new ArrayList>(); + List> inputEvents = new ArrayList<>(); Event startEvent = new Event(42, "start", 1.0); SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); Event endEvent= new Event(43, "end", 1.0); - inputEvents.add(new StreamEvent(startEvent, 1)); - inputEvents.add(new StreamEvent(new Event(43, "foobar", 1.0), 2)); - inputEvents.add(new StreamEvent(new SubEvent(41, "barfoo", 1.0, 5.0), 3)); - inputEvents.add(new StreamEvent(middleEvent, 3)); - inputEvents.add(new StreamEvent(new Event(43, "start", 1.0), 4)); - inputEvents.add(new StreamEvent(endEvent, 5)); + inputEvents.add(new StreamRecord(startEvent, 1)); + inputEvents.add(new StreamRecord(new Event(43, "foobar", 1.0), 2)); + inputEvents.add(new StreamRecord(new SubEvent(41, "barfoo", 1.0, 5.0), 3)); + inputEvents.add(new StreamRecord(middleEvent, 3)); + inputEvents.add(new StreamRecord(new Event(43, "start", 1.0), 4)); + inputEvents.add(new StreamRecord(endEvent, 5)); Pattern pattern = Pattern.begin("start").where(new FilterFunction() { private static final long serialVersionUID = 5726188262756267490L; @@ -82,11 +82,12 @@ public boolean filter(Event value) throws Exception { }); NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + List> resultingPatterns = new ArrayList<>(); - for (StreamEvent inputEvent: inputEvents) { + for (StreamRecord inputEvent: inputEvents) { Collection> patterns = nfa.process( - inputEvent.getEvent(), + inputEvent.getValue(), inputEvent.getTimestamp()).f0; resultingPatterns.addAll(patterns); @@ -106,19 +107,19 @@ public boolean filter(Event value) throws Exception { */ @Test public void testSimplePatternWithTimeWindowNFA() { - List> events = new ArrayList<>(); + List> events = new ArrayList<>(); List> resultingPatterns = new ArrayList<>(); final Event startEvent; final Event middleEvent; final Event endEvent; - events.add(new StreamEvent(new Event(1, "start", 1.0), 1)); - events.add(new StreamEvent(startEvent = new Event(2, "start", 1.0), 2)); - events.add(new StreamEvent(middleEvent = new Event(3, "middle", 1.0), 3)); - events.add(new StreamEvent(new Event(4, "foobar", 1.0), 4)); - events.add(new StreamEvent(endEvent = new Event(5, "end", 1.0), 11)); - events.add(new StreamEvent(new Event(6, "end", 1.0), 13)); + events.add(new StreamRecord(new Event(1, "start", 1.0), 1)); + events.add(new StreamRecord(startEvent = new Event(2, "start", 1.0), 2)); + events.add(new StreamRecord(middleEvent = new Event(3, "middle", 1.0), 3)); + events.add(new StreamRecord(new Event(4, "foobar", 1.0), 4)); + events.add(new StreamRecord(endEvent = new Event(5, "end", 1.0), 11)); + events.add(new StreamRecord(new Event(6, "end", 1.0), 13)); Pattern pattern = Pattern.begin("start").where(new FilterFunction() { @@ -147,8 +148,8 @@ public boolean filter(Event value) throws Exception { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - for (StreamEvent event: events) { - Collection> patterns = nfa.process(event.getEvent(), event.getTimestamp()).f0; + for (StreamRecord event: events) { + Collection> patterns = nfa.process(event.getValue(), event.getTimestamp()).f0; resultingPatterns.addAll(patterns); } @@ -168,18 +169,18 @@ public boolean filter(Event value) throws Exception { */ @Test public void testSimplePatternWithTimeoutHandling() { - List> events = new ArrayList<>(); + List> events = new ArrayList<>(); List> resultingPatterns = new ArrayList<>(); Set, Long>> resultingTimeoutPatterns = new HashSet<>(); Set, Long>> expectedTimeoutPatterns = new HashSet<>(); - events.add(new StreamEvent(new Event(1, "start", 1.0), 1)); - events.add(new StreamEvent(new Event(2, "start", 1.0), 2)); - events.add(new StreamEvent(new Event(3, "middle", 1.0), 3)); - events.add(new StreamEvent(new Event(4, "foobar", 1.0), 4)); - events.add(new StreamEvent(new Event(5, "end", 1.0), 11)); - events.add(new StreamEvent(new Event(6, "end", 1.0), 13)); + events.add(new StreamRecord(new Event(1, "start", 1.0), 1)); + events.add(new StreamRecord(new Event(2, "start", 1.0), 2)); + events.add(new StreamRecord(new Event(3, "middle", 1.0), 3)); + events.add(new StreamRecord(new Event(4, "foobar", 1.0), 4)); + events.add(new StreamRecord(new Event(5, "end", 1.0), 11)); + events.add(new StreamRecord(new Event(6, "end", 1.0), 13)); Map timeoutPattern1 = new HashMap<>(); timeoutPattern1.put("start", new Event(1, "start", 1.0)); @@ -226,9 +227,9 @@ public boolean filter(Event value) throws Exception { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), true); - for (StreamEvent event: events) { + for (StreamRecord event: events) { Tuple2>, Collection, Long>>> patterns = - nfa.process(event.getEvent(), event.getTimestamp()); + nfa.process(event.getValue(), event.getTimestamp()); Collection> matchedPatterns = patterns.f0; Collection, Long>> timeoutPatterns = patterns.f1; diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java index f48dab3bc525b..9f65132a26d5e 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java @@ -21,7 +21,7 @@ import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.cep.Event; -import org.apache.flink.cep.StreamEvent; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -44,12 +44,12 @@ public class NFATest extends TestLogger { @Test public void testSimpleNFA() { NFA nfa = new NFA<>(Event.createTypeSerializer(), 0, false); - List> streamEvents = new ArrayList<>(); + List> streamEvents = new ArrayList<>(); - streamEvents.add(StreamEvent.of(new Event(1, "start", 1.0), 1L)); - streamEvents.add(StreamEvent.of(new Event(2, "bar", 2.0), 2L)); - streamEvents.add(StreamEvent.of(new Event(3, "start", 3.0), 3L)); - streamEvents.add(StreamEvent.of(new Event(4, "end", 4.0), 4L)); + streamEvents.add(new StreamRecord<>(new Event(1, "start", 1.0), 1L)); + streamEvents.add(new StreamRecord<>(new Event(2, "bar", 2.0), 2L)); + streamEvents.add(new StreamRecord<>(new Event(3, "start", 3.0), 3L)); + streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 4L)); State startingState = new State<>("", State.StateType.Start); State startState = new State<>("start", State.StateType.Normal); @@ -111,12 +111,12 @@ public boolean filter(Event value) throws Exception { @Test public void testTimeoutWindowPruning() { NFA nfa = createStartEndNFA(2); - List> streamEvents = new ArrayList<>(); + List> streamEvents = new ArrayList<>(); - streamEvents.add(StreamEvent.of(new Event(1, "start", 1.0), 1L)); - streamEvents.add(StreamEvent.of(new Event(2, "bar", 2.0), 2L)); - streamEvents.add(StreamEvent.of(new Event(3, "start", 3.0), 3L)); - streamEvents.add(StreamEvent.of(new Event(4, "end", 4.0), 4L)); + streamEvents.add(new StreamRecord<>(new Event(1, "start", 1.0), 1L)); + streamEvents.add(new StreamRecord<>(new Event(2, "bar", 2.0), 2L)); + streamEvents.add(new StreamRecord<>(new Event(3, "start", 3.0), 3L)); + streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 4L)); Set> expectedPatterns = new HashSet<>(); @@ -138,10 +138,10 @@ public void testTimeoutWindowPruning() { @Test public void testWindowBorders() { NFA nfa = createStartEndNFA(2); - List> streamEvents = new ArrayList<>(); + List> streamEvents = new ArrayList<>(); - streamEvents.add(StreamEvent.of(new Event(1, "start", 1.0), 1L)); - streamEvents.add(StreamEvent.of(new Event(2, "end", 2.0), 3L)); + streamEvents.add(new StreamRecord<>(new Event(1, "start", 1.0), 1L)); + streamEvents.add(new StreamRecord<>(new Event(2, "end", 2.0), 3L)); Set> expectedPatterns = Collections.emptySet(); @@ -157,12 +157,12 @@ public void testWindowBorders() { @Test public void testTimeoutWindowPruningWindowBorders() { NFA nfa = createStartEndNFA(2); - List> streamEvents = new ArrayList<>(); + List> streamEvents = new ArrayList<>(); - streamEvents.add(StreamEvent.of(new Event(1, "start", 1.0), 1L)); - streamEvents.add(StreamEvent.of(new Event(2, "start", 2.0), 2L)); - streamEvents.add(StreamEvent.of(new Event(3, "foobar", 3.0), 3L)); - streamEvents.add(StreamEvent.of(new Event(4, "end", 4.0), 3L)); + streamEvents.add(new StreamRecord<>(new Event(1, "start", 1.0), 1L)); + streamEvents.add(new StreamRecord<>(new Event(2, "start", 2.0), 2L)); + streamEvents.add(new StreamRecord<>(new Event(3, "foobar", 3.0), 3L)); + streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 3L)); Set> expectedPatterns = new HashSet<>(); @@ -193,11 +193,11 @@ public void testStateNameGeneration() { assertEquals(expectedName3, generatedName3); } - public Collection> runNFA(NFA nfa, List> inputs) { + public Collection> runNFA(NFA nfa, List> inputs) { Set> actualPatterns = new HashSet<>(); - for (StreamEvent streamEvent: inputs) { - Collection> matchedPatterns = nfa.process(streamEvent.getEvent(), streamEvent.getTimestamp()).f0; + for (StreamRecord streamEvent: inputs) { + Collection> matchedPatterns = nfa.process(streamEvent.getValue(), streamEvent.getTimestamp()).f0; actualPatterns.addAll(matchedPatterns); } From b2969d3c562406b61bcc68aa5f8c45ded18a875d Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Tue, 5 Jul 2016 17:58:45 +0200 Subject: [PATCH 2/8] [FLINK-4149] Fix Serialization of NFA in AbstractKeyedCEPPatternOperator NFA is Serializable and has readObject()/writeObject() methods. In AbstractKeyedCEPPatternOperator a KryoSerializer was used as the TypeSerializer for the ValueState that holds NFA instances. Kryo does not call readObject()/writeObject() therefore the state of the NFA was invalid after deserialization. This change adds a new TypeSerializer for NFA that uses Java Serialization. In the long run it will be better to get rid of the readObject()/writeObject() methods and instead efficiently serialize using a specialized TypeSerializer. --- .../java/org/apache/flink/cep/nfa/NFA.java | 126 ++++++++++++++++ .../AbstractKeyedCEPPatternOperator.java | 3 +- .../flink/cep/operator/CEPOperatorTest.java | 140 +++++++++++++++--- 3 files changed, 247 insertions(+), 22 deletions(-) diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index f769a2b5c750d..28b116db171e4 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -19,12 +19,16 @@ package org.apache.flink.cep.nfa; import com.google.common.collect.LinkedHashMultimap; +import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.NonDuplicatingTypeSerializer; +import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; @@ -454,4 +458,126 @@ static String generateStateName(final String name, final int index) { return name + "_" + index; } } + + /** + * {@link TypeSerializer} for {@link NFA} that uses Java Serialization. + */ + public static class Serializer extends TypeSerializer> { + private static final long serialVersionUID = 1L; + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer> duplicate() { + return this; + } + + @Override + public NFA createInstance() { + return null; + } + + @Override + public NFA copy(NFA from) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + + oos.writeObject(from); + + oos.close(); + baos.close(); + + byte[] data = baos.toByteArray(); + + ByteArrayInputStream bais = new ByteArrayInputStream(data); + ObjectInputStream ois = new ObjectInputStream(bais); + + @SuppressWarnings("unchecked") + NFA copy = (NFA) ois.readObject(); + return copy; + } catch (IOException|ClassNotFoundException e) { + throw new RuntimeException("Could not copy NFA.", e); + } + } + + @Override + public NFA copy(NFA from, NFA reuse) { + return copy(from); + } + + @Override + public int getLength() { + return 0; + } + + @Override + public void serialize(NFA record, DataOutputView target) throws IOException { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + + oos.writeObject(record); + + oos.close(); + baos.close(); + + byte[] data = baos.toByteArray(); + + target.writeInt(data.length); + target.write(data); + } catch (IOException e) { + throw new RuntimeException("Could not serialize NFA.", e); + } + } + + @Override + public NFA deserialize(DataInputView source) throws IOException { + try { + int size = source.readInt(); + + byte[] data = new byte[size]; + + source.readFully(data); + + ByteArrayInputStream bais = new ByteArrayInputStream(data); + ObjectInputStream ois = new ObjectInputStream(bais); + + @SuppressWarnings("unchecked") + NFA copy = (NFA) ois.readObject(); + return copy; + } catch (IOException|ClassNotFoundException e) { + throw new RuntimeException("Could not deserialize NFA.", e); + } } + + @Override + public NFA deserialize(NFA reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + int size = source.readInt(); + target.writeInt(size); + target.write(source, size); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof Serializer; + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof Serializer; + } + + @Override + public int hashCode() { + return getClass().hashCode(); + } + } } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index 83892cab91242..366edf78fe204 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.core.memory.DataInputView; @@ -100,7 +99,7 @@ public void open() throws Exception { nfaOperatorState = getPartitionedState( new ValueStateDescriptor>( NFA_OPERATOR_STATE_NAME, - new KryoSerializer>((Class>) (Class) NFA.class, getExecutionConfig()), + new NFA.Serializer(), null)); } diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index d5ef5be4ab11d..01057156b79b5 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -18,29 +18,35 @@ package org.apache.flink.cep.operator; +import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.cep.Event; +import org.apache.flink.cep.SubEvent; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTaskState; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.TestLogger; import org.junit.Test; import static org.junit.Assert.*; import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; public class CEPOperatorTest extends TestLogger { @Test public void testCEPOperatorWatermarkForwarding() throws Exception { - OneInputStreamOperatorTestHarness> harness = new OneInputStreamOperatorTestHarness<>( - new CEPPatternOperator( - IntSerializer.INSTANCE, + OneInputStreamOperatorTestHarness> harness = new OneInputStreamOperatorTestHarness<>( + new CEPPatternOperator<>( + Event.createTypeSerializer(), false, - new DummyNFAFactory<>(IntSerializer.INSTANCE)) + new NFAFactory()) ); harness.open(); @@ -59,22 +65,22 @@ public void testCEPOperatorWatermarkForwarding() throws Exception { @Test public void testKeyedCEPOperatorWatermarkForwarding() throws Exception { - KeySelector keySelector = new KeySelector() { + KeySelector keySelector = new KeySelector() { private static final long serialVersionUID = -4873366487571254798L; @Override - public Integer getKey(Integer value) throws Exception { - return value; + public Integer getKey(Event value) throws Exception { + return value.getId(); } }; - OneInputStreamOperatorTestHarness> harness = new OneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator( - IntSerializer.INSTANCE, + OneInputStreamOperatorTestHarness> harness = new OneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), false, keySelector, IntSerializer.INSTANCE, - new DummyNFAFactory<>(IntSerializer.INSTANCE)) + new NFAFactory()) ); harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO); @@ -93,19 +99,113 @@ public Integer getKey(Integer value) throws Exception { harness.close(); } - public static class DummyNFAFactory implements NFACompiler.NFAFactory { + @Test + public void testKeyedCEPOperatorCheckpointing() throws Exception { + KeySelector keySelector = new KeySelector() { + private static final long serialVersionUID = -4873366487571254798L; - private static final long serialVersionUID = 1173020762472766713L; + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; - private final TypeSerializer inputTypeSerializer; + OneInputStreamOperatorTestHarness> harness = new OneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + IntSerializer.INSTANCE, + new NFAFactory())); - public DummyNFAFactory(TypeSerializer inputTypeSerializer) { - this.inputTypeSerializer = inputTypeSerializer; - } + harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO); + + harness.open(); + + Event startEvent = new Event(42, "start", 1.0); + SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); + Event endEvent= new Event(42, "end", 1.0); + + harness.processElement(new StreamRecord(startEvent, 1)); + harness.processElement(new StreamRecord(new Event(42, "foobar", 1.0), 2)); + harness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); + + harness.processWatermark(new Watermark(2)); + + // simulate snapshot/restore + StreamTaskState snapshot = harness.snapshot(0, 0); + + harness = new OneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + IntSerializer.INSTANCE, + new NFAFactory())); + + harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO); + harness.setup(); + harness.restore(snapshot, 1); + harness.open(); + + harness.processElement(new StreamRecord(middleEvent, 3)); + harness.processElement(new StreamRecord(new Event(42, "start", 1.0), 4)); + harness.processElement(new StreamRecord(endEvent, 5)); + + harness.processWatermark(new Watermark(Long.MAX_VALUE)); + + ConcurrentLinkedQueue result = harness.getOutput(); + + assertEquals(2, result.size()); + + Object resultObject = result.poll(); + assertTrue(resultObject instanceof StreamRecord); + StreamRecord resultRecord = (StreamRecord) resultObject; + assertTrue(resultRecord.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map patternMap = (Map) resultRecord.getValue(); + + assertEquals(startEvent, patternMap.get("start")); + assertEquals(middleEvent, patternMap.get("middle")); + assertEquals(endEvent, patternMap.get("end")); + + harness.close(); + } + + private static class NFAFactory implements NFACompiler.NFAFactory { + + private static final long serialVersionUID = 1173020762472766713L; @Override - public NFA createNFA() { - return new NFA<>(inputTypeSerializer.duplicate(), 0, false); + public NFA createNFA() { + + Pattern pattern = Pattern.begin("start").where(new FilterFunction() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }) + .followedBy("middle").subtype(SubEvent.class).where(new FilterFunction() { + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getVolume() > 5.0; + } + }) + .followedBy("end").where(new FilterFunction() { + private static final long serialVersionUID = 7056763917392056548L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); + + return NFACompiler.compile(pattern, Event.createTypeSerializer(), false); } } } From 3696d4f6dcc69e8f1be86fa3607756ae91d38d61 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Wed, 6 Jul 2016 16:46:32 +0200 Subject: [PATCH 3/8] [FLINK-4162] Fix Event Queue Serialization in Abstract(Keyed)CEPPatternOperator Before, these were using StreamRecordSerializer, which does not serialize timestamps. Now it uses MultiplexingStreamRecordSerializer. This also extends the tests in CEPOperatorTest to test that timestamps are correctly checkpointed/restored. --- .../operator/AbstractCEPPatternOperator.java | 10 +- .../AbstractKeyedCEPPatternOperator.java | 14 +- .../flink/cep/operator/CEPOperatorTest.java | 122 +++++++++++++++++- 3 files changed, 133 insertions(+), 13 deletions(-) diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java index 753656fa25388..ceaf8c5a9fac6 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java @@ -25,8 +25,9 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.runtime.tasks.StreamTaskState; import java.io.IOException; @@ -46,7 +47,7 @@ abstract public class AbstractCEPPatternOperator extends AbstractCEPBasePatternOperator { private static final long serialVersionUID = 7487334510746595640L; - private final StreamRecordSerializer streamRecordSerializer; + private final MultiplexingStreamRecordSerializer streamRecordSerializer; // global nfa for all elements private NFA nfa; @@ -60,7 +61,7 @@ public AbstractCEPPatternOperator( NFACompiler.NFAFactory nfaFactory) { super(inputSerializer, isProcessingTime); - this.streamRecordSerializer = new StreamRecordSerializer<>(inputSerializer); + this.streamRecordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer); this.nfa = nfaFactory.createNFA(); } @@ -134,7 +135,8 @@ public void restoreState(StreamTaskState state, long recoveryTimestamp) throws E priorityQueue = new PriorityQueue>(numberPriorityQueueEntries, new StreamRecordComparator()); for (int i = 0; i asRecord()); } div.close(); diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index 366edf78fe204..1f2095b315525 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -29,8 +29,8 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.runtime.tasks.StreamTaskState; import java.io.IOException; @@ -103,13 +103,17 @@ public void open() throws Exception { null)); } + @SuppressWarnings("unchecked,rawtypes") + TypeSerializer> streamRecordSerializer = + (TypeSerializer) new MultiplexingStreamRecordSerializer<>(getInputSerializer()); + if (priorityQueueOperatorState == null) { priorityQueueOperatorState = getPartitionedState( - new ValueStateDescriptor>>( + new ValueStateDescriptor<>( PRIORIRY_QUEUE_STATE_NAME, - new PriorityQueueSerializer>( - new StreamRecordSerializer(getInputSerializer()), - new PriorityQueueStreamRecordFactory()), + new PriorityQueueSerializer<>( + streamRecordSerializer, + new PriorityQueueStreamRecordFactory()), null)); } } diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index 01057156b79b5..1f53d97f24f3c 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -28,6 +28,7 @@ import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTaskState; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -99,6 +100,94 @@ public Integer getKey(Event value) throws Exception { harness.close(); } + @Test + public void testCEPOperatorCheckpointing() throws Exception { + KeySelector keySelector = new KeySelector() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + OneInputStreamOperatorTestHarness> harness = new OneInputStreamOperatorTestHarness<>( + new CEPPatternOperator<>( + Event.createTypeSerializer(), + false, + new NFAFactory())); + + harness.open(); + + Event startEvent = new Event(42, "start", 1.0); + SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); + Event endEvent= new Event(42, "end", 1.0); + + harness.processElement(new StreamRecord(startEvent, 1)); + harness.processElement(new StreamRecord(new Event(42, "foobar", 1.0), 2)); + + // simulate snapshot/restore with some elements in internal sorting queue + StreamTaskState snapshot = harness.snapshot(0, 0); + + harness = new OneInputStreamOperatorTestHarness<>( + new CEPPatternOperator<>( + Event.createTypeSerializer(), + false, + new NFAFactory())); + + harness.setup(); + harness.restore(snapshot, 1); + harness.open(); + + harness.processWatermark(new Watermark(Long.MIN_VALUE)); + + harness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); + + // if element timestamps are not correctly checkpointed/restored this will lead to + // a pruning time underflow exception in NFA + harness.processWatermark(new Watermark(2)); + + // simulate snapshot/restore with empty element queue but NFA state + StreamTaskState snapshot2 = harness.snapshot(1, 1); + + harness = new OneInputStreamOperatorTestHarness<>( + new CEPPatternOperator<>( + Event.createTypeSerializer(), + false, + new NFAFactory())); + + harness.setup(); + harness.restore(snapshot2, 2); + harness.open(); + + harness.processElement(new StreamRecord(middleEvent, 3)); + harness.processElement(new StreamRecord(new Event(42, "start", 1.0), 4)); + harness.processElement(new StreamRecord(endEvent, 5)); + + harness.processWatermark(new Watermark(Long.MAX_VALUE)); + + ConcurrentLinkedQueue result = harness.getOutput(); + + // watermark and the result + assertEquals(2, result.size()); + + Object resultObject = result.poll(); + assertTrue(resultObject instanceof StreamRecord); + StreamRecord resultRecord = (StreamRecord) resultObject; + assertTrue(resultRecord.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map patternMap = (Map) resultRecord.getValue(); + + assertEquals(startEvent, patternMap.get("start")); + assertEquals(middleEvent, patternMap.get("middle")); + assertEquals(endEvent, patternMap.get("end")); + + harness.close(); + } + + + @Test public void testKeyedCEPOperatorCheckpointing() throws Exception { KeySelector keySelector = new KeySelector() { @@ -128,12 +217,33 @@ public Integer getKey(Event value) throws Exception { harness.processElement(new StreamRecord(startEvent, 1)); harness.processElement(new StreamRecord(new Event(42, "foobar", 1.0), 2)); + + // simulate snapshot/restore with some elements in internal sorting queue + StreamTaskState snapshot = harness.snapshot(0, 0); + + harness = new OneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + IntSerializer.INSTANCE, + new NFAFactory())); + + harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO); + harness.setup(); + harness.restore(snapshot, 1); + harness.open(); + + harness.processWatermark(new Watermark(Long.MIN_VALUE)); + harness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); + // if element timestamps are not correctly checkpointed/restored this will lead to + // a pruning time underflow exception in NFA harness.processWatermark(new Watermark(2)); - // simulate snapshot/restore - StreamTaskState snapshot = harness.snapshot(0, 0); + // simulate snapshot/restore with empty element queue but NFA state + StreamTaskState snapshot2 = harness.snapshot(1, 1); harness = new OneInputStreamOperatorTestHarness<>( new KeyedCEPPatternOperator<>( @@ -145,7 +255,7 @@ public Integer getKey(Event value) throws Exception { harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO); harness.setup(); - harness.restore(snapshot, 1); + harness.restore(snapshot2, 2); harness.open(); harness.processElement(new StreamRecord(middleEvent, 3)); @@ -156,6 +266,7 @@ public Integer getKey(Event value) throws Exception { ConcurrentLinkedQueue result = harness.getOutput(); + // watermark and the result assertEquals(2, result.size()); Object resultObject = result.poll(); @@ -203,7 +314,10 @@ public boolean filter(SubEvent value) throws Exception { public boolean filter(Event value) throws Exception { return value.getName().equals("end"); } - }); + }) + // add a window timeout to test whether timestamps of elements in the + // priority queue in CEP operator are correctly checkpointed/restored + .within(Time.milliseconds(10)); return NFACompiler.compile(pattern, Event.createTypeSerializer(), false); } From bdf24f56bf39ae2e6a7bb7e7b8488881395112f8 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Thu, 7 Jul 2016 14:48:13 +0200 Subject: [PATCH 4/8] Fix Displayed CEP Operator Names --- .../org/apache/flink/cep/operator/CEPOperatorUtils.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java index a3fffa5345d3e..548be16ba87fb 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java @@ -36,7 +36,6 @@ import java.util.Map; public class CEPOperatorUtils { - private static final String PATTERN_OPERATOR_NAME = "AbstractCEPPatternOperator"; /** * Creates a data stream containing the fully matching event patterns of the NFA computation. @@ -64,7 +63,7 @@ public static DataStream> createPatternStream(DataStream keySerializer = keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig()); patternStream = keyedStream.transform( - PATTERN_OPERATOR_NAME, + "KeyedCEPPatternOperator", (TypeInformation>) (TypeInformation) TypeExtractor.getForClass(Map.class), new KeyedCEPPatternOperator<>( inputSerializer, @@ -74,7 +73,7 @@ public static DataStream> createPatternStream(DataStream>) (TypeInformation) TypeExtractor.getForClass(Map.class), new CEPPatternOperator( inputSerializer, @@ -119,7 +118,7 @@ public static DataStream, Long>, Map keySerializer = keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig()); patternStream = keyedStream.transform( - PATTERN_OPERATOR_NAME, + "TimeoutKeyedCEPPatternOperator", eitherTypeInformation, new TimeoutKeyedCEPPatternOperator( inputSerializer, @@ -129,7 +128,7 @@ public static DataStream, Long>, Map( inputSerializer, From b87b2cd2a48ea95afbff257224971c35674a251c Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Thu, 7 Jul 2016 15:12:44 +0200 Subject: [PATCH 5/8] [FLINK-4165] Add warning about equals/hashCode to CEP doc --- docs/apis/streaming/libs/cep.md | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/docs/apis/streaming/libs/cep.md b/docs/apis/streaming/libs/cep.md index d465519dcda53..57af91695ebcf 100644 --- a/docs/apis/streaming/libs/cep.md +++ b/docs/apis/streaming/libs/cep.md @@ -35,6 +35,13 @@ It allows you to easily detect complex event patterns in a stream of endless dat Complex events can then be constructed from matching sequences. This gives you the opportunity to quickly get hold of what's really important in your data. +Attention The events in the `DataStream` to which +you want to apply pattern matching have to implement proper `equals()` and `hashCode()` methods +because these are used for comparing and matching events. + +* This will be replaced by the TOC +{:toc} + ## Getting Started If you want to jump right in, you have to [set up a Flink program]({{ site.baseurl }}/apis/batch/index.html#linking-with-flink). @@ -518,10 +525,10 @@ The collector can be used to emit an arbitrary number of events. val patternStream: PatternStream[Event] = CEP.pattern(input, pattern) DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.flatSelect{ - (pattern: mutable.Map[String, Event], timestamp: Long, out: Collector[TimeoutEvent]) => + (pattern: mutable.Map[String, Event], timestamp: Long, out: Collector[TimeoutEvent]) => out.collect(TimeoutEvent()) } { - (pattern: mutable.Map[String, Event], out: Collector[ComplexEvent]) => + (pattern: mutable.Map[String, Event], out: Collector[ComplexEvent]) => out.collect(ComplexEvent()) } {% endhighlight %} From ba85f732ab0d1edb6a92b3abaa11b3851e4b5bd3 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Thu, 7 Jul 2016 16:45:46 +0200 Subject: [PATCH 6/8] Allow Setting StateBackend in OneInputStreamOperatorTestHarness --- .../OneInputStreamOperatorTestHarness.java | 54 ++++++++++++++++--- 1 file changed, 47 insertions(+), 7 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 66bdb57488ecf..4432d9fcb6ea9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -27,6 +27,9 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot; +import org.apache.flink.runtime.state.AsynchronousStateHandle; +import org.apache.flink.runtime.state.KvStateSnapshot; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; @@ -42,7 +45,10 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.io.Serializable; import java.util.Collection; +import java.util.HashMap; +import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; @@ -76,6 +82,9 @@ public class OneInputStreamOperatorTestHarness { StreamTask mockTask; + // use this as default for tests + private AbstractStateBackend stateBackend = new MemoryStateBackend(); + /** * Whether setup() was called on the operator. This is reset when calling close(). */ @@ -86,12 +95,16 @@ public OneInputStreamOperatorTestHarness(OneInputStreamOperator operato this(operator, new ExecutionConfig()); } - public OneInputStreamOperatorTestHarness(OneInputStreamOperator operator, ExecutionConfig executionConfig) { + public OneInputStreamOperatorTestHarness( + OneInputStreamOperator operator, + ExecutionConfig executionConfig) { this(operator, executionConfig, DefaultTimeServiceProvider.create(Executors.newSingleThreadScheduledExecutor())); } - public OneInputStreamOperatorTestHarness(OneInputStreamOperator operator, ExecutionConfig executionConfig, - TimeServiceProvider testTimeProvider) { + public OneInputStreamOperatorTestHarness( + OneInputStreamOperator operator, + ExecutionConfig executionConfig, + TimeServiceProvider testTimeProvider) { this.operator = operator; this.outputList = new ConcurrentLinkedQueue(); this.config = new StreamConfig(new Configuration()); @@ -114,9 +127,9 @@ public OneInputStreamOperatorTestHarness(OneInputStreamOperator operato public AbstractStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable { final String operatorIdentifier = (String) invocationOnMock.getArguments()[0]; final TypeSerializer keySerializer = (TypeSerializer) invocationOnMock.getArguments()[1]; - MemoryStateBackend backend = MemoryStateBackend.create(); - backend.initializeForJob(env, operatorIdentifier, keySerializer); - return backend; + OneInputStreamOperatorTestHarness.this.stateBackend.disposeAllStateForCurrentJob(); + OneInputStreamOperatorTestHarness.this.stateBackend.initializeForJob(env, operatorIdentifier, keySerializer); + return OneInputStreamOperatorTestHarness.this.stateBackend; } }).when(mockTask).createStateBackend(any(String.class), any(TypeSerializer.class)); } catch (Exception e) { @@ -143,6 +156,10 @@ public Long answer(InvocationOnMock invocation) throws Throwable { }).when(mockTask).getCurrentProcessingTime(); } + public void setStateBackend(AbstractStateBackend stateBackend) { + this.stateBackend = stateBackend; + } + public Object getCheckpointLock() { return mockTask.getCheckpointLock(); } @@ -191,7 +208,30 @@ public void open() throws Exception { * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#snapshotOperatorState(long, long)} ()} */ public StreamTaskState snapshot(long checkpointId, long timestamp) throws Exception { - return operator.snapshotOperatorState(checkpointId, timestamp); + StreamTaskState snapshot = operator.snapshotOperatorState(checkpointId, timestamp); + // materialize asynchronous state handles + if (snapshot != null) { + if (snapshot.getFunctionState() instanceof AsynchronousStateHandle) { + AsynchronousStateHandle asyncState = (AsynchronousStateHandle) snapshot.getFunctionState(); + snapshot.setFunctionState(asyncState.materialize()); + } + if (snapshot.getOperatorState() instanceof AsynchronousStateHandle) { + AsynchronousStateHandle asyncState = (AsynchronousStateHandle) snapshot.getOperatorState(); + snapshot.setOperatorState(asyncState.materialize()); + } + if (snapshot.getKvStates() != null) { + Set keys = snapshot.getKvStates().keySet(); + HashMap> kvStates = snapshot.getKvStates(); + for (String key: keys) { + if (kvStates.get(key) instanceof AsynchronousKvStateSnapshot) { + AsynchronousKvStateSnapshot asyncHandle = (AsynchronousKvStateSnapshot) kvStates.get(key); + kvStates.put(key, asyncHandle.materialize()); + } + } + } + + } + return snapshot; } /** From 277f46d0f4ddd20446b87ed6535332754b97be76 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Thu, 7 Jul 2016 16:46:29 +0200 Subject: [PATCH 7/8] [FLINK-4169] Fix State Handling in CEP Before, ValueState.update() was not called after changing the NFA or the priority queue in CEP operators. This means that the operators don't work with state backends such as RocksDB that strictly require that update() be called when state changes. This changes the operators to always call update() and also introduces a test that verifies the changes. --- flink-libraries/flink-cep/pom.xml | 8 ++ .../AbstractCEPBasePatternOperator.java | 6 + .../operator/AbstractCEPPatternOperator.java | 10 ++ .../AbstractKeyedCEPPatternOperator.java | 23 +++- .../flink/cep/operator/CEPOperatorTest.java | 121 +++++++++++++++++- 5 files changed, 166 insertions(+), 2 deletions(-) diff --git a/flink-libraries/flink-cep/pom.xml b/flink-libraries/flink-cep/pom.xml index 32b26d9967e52..5640eb85d5fd2 100644 --- a/flink-libraries/flink-cep/pom.xml +++ b/flink-libraries/flink-cep/pom.xml @@ -83,6 +83,14 @@ under the License. test + + org.apache.flink + flink-statebackend-rocksdb_2.10 + ${project.version} + test + + + diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java index 44649acc5d446..aad408cf7037c 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java @@ -58,14 +58,19 @@ public TypeSerializer getInputSerializer() { protected abstract NFA getNFA() throws IOException; + protected abstract void updateNFA(NFA nfa) throws IOException; + protected abstract PriorityQueue> getPriorityQueue() throws IOException; + protected abstract void updatePriorityQueue(PriorityQueue> queue) throws IOException; + @Override public void processElement(StreamRecord element) throws Exception { if (isProcessingTime) { // there can be no out of order elements in processing time NFA nfa = getNFA(); processEvent(nfa, element.getValue(), System.currentTimeMillis()); + updateNFA(nfa); } else { PriorityQueue> priorityQueue = getPriorityQueue(); @@ -77,6 +82,7 @@ public void processElement(StreamRecord element) throws Exception { } else { priorityQueue.offer(element); } + updatePriorityQueue(priorityQueue); } } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java index ceaf8c5a9fac6..e71a561eeb297 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java @@ -77,11 +77,21 @@ protected NFA getNFA() throws IOException { return nfa; } + @Override + protected void updateNFA(NFA nfa) { + // a no-op, because we only have one NFA + } + @Override protected PriorityQueue> getPriorityQueue() throws IOException { return priorityQueue; } + @Override + protected void updatePriorityQueue(PriorityQueue> queue) { + // a no-op, because we only have one priority queue + } + @Override public void processWatermark(Watermark mark) throws Exception { while(!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) { diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index 1f2095b315525..59e703a1f68f8 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -131,6 +131,11 @@ protected NFA getNFA() throws IOException { return nfa; } + @Override + protected void updateNFA(NFA nfa) throws IOException { + nfaOperatorState.update(nfa); + } + @Override protected PriorityQueue> getPriorityQueue() throws IOException { PriorityQueue> priorityQueue = priorityQueueOperatorState.value(); @@ -144,6 +149,11 @@ protected PriorityQueue> getPriorityQueue() throws IOException return priorityQueue; } + @Override + protected void updatePriorityQueue(PriorityQueue> queue) throws IOException { + priorityQueueOperatorState.update(queue); + } + @Override public void processElement(StreamRecord element) throws Exception { keys.add(keySelector.getKey(element.getValue())); @@ -158,7 +168,6 @@ public void processWatermark(Watermark mark) throws Exception { setKeyContext(key); PriorityQueue> priorityQueue = getPriorityQueue(); - NFA nfa = getNFA(); while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) { @@ -166,6 +175,8 @@ public void processWatermark(Watermark mark) throws Exception { processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp()); } + updateNFA(nfa); + updatePriorityQueue(priorityQueue); } output.emitWatermark(mark); @@ -336,5 +347,15 @@ private static class PriorityQueueStreamRecordFactory implements PriorityQueu public PriorityQueue> createPriorityQueue() { return new PriorityQueue>(INITIAL_PRIORITY_QUEUE_CAPACITY, new StreamRecordComparator()); } + + @Override + public boolean equals(Object obj) { + return obj instanceof PriorityQueueStreamRecordFactory; + } + + @Override + public int hashCode() { + return getClass().hashCode(); + } } } diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index 1f53d97f24f3c..56c41615f19b3 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -27,13 +27,18 @@ import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTaskState; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.TestLogger; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; + import static org.junit.Assert.*; import java.util.Map; @@ -41,6 +46,9 @@ public class CEPOperatorTest extends TestLogger { + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + @Test public void testCEPOperatorWatermarkForwarding() throws Exception { OneInputStreamOperatorTestHarness> harness = new OneInputStreamOperatorTestHarness<>( @@ -186,10 +194,112 @@ public Integer getKey(Event value) throws Exception { harness.close(); } + @Test + public void testKeyedCEPOperatorCheckpointing() throws Exception { + + KeySelector keySelector = new KeySelector() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + OneInputStreamOperatorTestHarness> harness = new OneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + IntSerializer.INSTANCE, + new NFAFactory())); + + harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO); + + harness.open(); + + Event startEvent = new Event(42, "start", 1.0); + SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); + Event endEvent= new Event(42, "end", 1.0); + + harness.processElement(new StreamRecord(startEvent, 1)); + harness.processElement(new StreamRecord(new Event(42, "foobar", 1.0), 2)); + + // simulate snapshot/restore with some elements in internal sorting queue + StreamTaskState snapshot = harness.snapshot(0, 0); + + harness = new OneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + IntSerializer.INSTANCE, + new NFAFactory())); + + harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO); + harness.setup(); + harness.restore(snapshot, 1); + harness.open(); + + harness.processWatermark(new Watermark(Long.MIN_VALUE)); + + harness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); + + // if element timestamps are not correctly checkpointed/restored this will lead to + // a pruning time underflow exception in NFA + harness.processWatermark(new Watermark(2)); + + // simulate snapshot/restore with empty element queue but NFA state + StreamTaskState snapshot2 = harness.snapshot(1, 1); + + harness = new OneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + IntSerializer.INSTANCE, + new NFAFactory())); + + harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO); + harness.setup(); + harness.restore(snapshot2, 2); + harness.open(); + + harness.processElement(new StreamRecord(middleEvent, 3)); + harness.processElement(new StreamRecord(new Event(42, "start", 1.0), 4)); + harness.processElement(new StreamRecord(endEvent, 5)); + + harness.processWatermark(new Watermark(Long.MAX_VALUE)); + ConcurrentLinkedQueue result = harness.getOutput(); + + // watermark and the result + assertEquals(2, result.size()); + + Object resultObject = result.poll(); + assertTrue(resultObject instanceof StreamRecord); + StreamRecord resultRecord = (StreamRecord) resultObject; + assertTrue(resultRecord.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map patternMap = (Map) resultRecord.getValue(); + + assertEquals(startEvent, patternMap.get("start")); + assertEquals(middleEvent, patternMap.get("middle")); + assertEquals(endEvent, patternMap.get("end")); + + harness.close(); + } @Test - public void testKeyedCEPOperatorCheckpointing() throws Exception { + public void testKeyedCEPOperatorCheckpointingWithRocksDB() throws Exception { + + String rocksDbPath = tempFolder.newFolder().getAbsolutePath(); + String rocksDbBackups = tempFolder.newFolder().toURI().toString(); + RocksDBStateBackend rocksDBStateBackend = + new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend()); + rocksDBStateBackend.setDbStoragePath(rocksDbPath); + KeySelector keySelector = new KeySelector() { private static final long serialVersionUID = -4873366487571254798L; @@ -207,6 +317,7 @@ public Integer getKey(Event value) throws Exception { IntSerializer.INSTANCE, new NFAFactory())); + harness.setStateBackend(rocksDBStateBackend); harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO); harness.open(); @@ -229,6 +340,10 @@ public Integer getKey(Event value) throws Exception { IntSerializer.INSTANCE, new NFAFactory())); + rocksDBStateBackend = + new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend()); + rocksDBStateBackend.setDbStoragePath(rocksDbPath); + harness.setStateBackend(rocksDBStateBackend); harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO); harness.setup(); harness.restore(snapshot, 1); @@ -253,6 +368,10 @@ public Integer getKey(Event value) throws Exception { IntSerializer.INSTANCE, new NFAFactory())); + rocksDBStateBackend = + new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend()); + rocksDBStateBackend.setDbStoragePath(rocksDbPath); + harness.setStateBackend(rocksDBStateBackend); harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO); harness.setup(); harness.restore(snapshot2, 2); From ca05250a5a7f2a4913f1a4c762d635991333c7ca Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Wed, 13 Jul 2016 17:24:52 +0200 Subject: [PATCH 8/8] Fixup: [FLINK-4149] Fix Serialization of NFA in AbstractKeyedCEPPatternOperator --- .../java/org/apache/flink/cep/nfa/NFA.java | 46 ++++++------------- 1 file changed, 15 insertions(+), 31 deletions(-) diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index 28b116db171e4..624db0de5fd0f 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -19,9 +19,10 @@ package org.apache.flink.cep.nfa; import com.google.common.collect.LinkedHashMultimap; -import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; +import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; import org.apache.flink.cep.NonDuplicatingTypeSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -29,6 +30,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; @@ -516,42 +518,24 @@ public int getLength() { @Override public void serialize(NFA record, DataOutputView target) throws IOException { - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - - oos.writeObject(record); - - oos.close(); - baos.close(); - - byte[] data = baos.toByteArray(); - - target.writeInt(data.length); - target.write(data); - } catch (IOException e) { - throw new RuntimeException("Could not serialize NFA.", e); - } + ObjectOutputStream oos = new ObjectOutputStream(new DataOutputViewStream(target)); + oos.writeObject(record); + oos.close(); } @Override public NFA deserialize(DataInputView source) throws IOException { - try { - int size = source.readInt(); - - byte[] data = new byte[size]; - - source.readFully(data); - - ByteArrayInputStream bais = new ByteArrayInputStream(data); - ObjectInputStream ois = new ObjectInputStream(bais); + ObjectInputStream ois = new ObjectInputStream(new DataInputViewStream(source)); + try { @SuppressWarnings("unchecked") - NFA copy = (NFA) ois.readObject(); - return copy; - } catch (IOException|ClassNotFoundException e) { + NFA nfa = null; + nfa = (NFA) ois.readObject(); + return nfa; + } catch (ClassNotFoundException e) { throw new RuntimeException("Could not deserialize NFA.", e); - } } + } + } @Override public NFA deserialize(NFA reuse, DataInputView source) throws IOException { @@ -567,7 +551,7 @@ public void copy(DataInputView source, DataOutputView target) throws IOException @Override public boolean equals(Object obj) { - return obj instanceof Serializer; + return obj instanceof Serializer && ((Serializer) obj).canEqual(this); } @Override