From 546e2ad73165b00d93c3c460372b9d49a4b5d8b7 Mon Sep 17 00:00:00 2001 From: kkloudas Date: Mon, 22 May 2017 11:43:42 +0200 Subject: [PATCH] [FLINK-6656] [cep] Change element PriorityQueue to MapState. This is to leverage the fact that RocksDB already returns the keys sorted. So now elements, instead of being stores in a PQ and all of them being deserialized and serialized at each incoming element, the are stored in a MapState with the key being the timestamp and the value, a List of elements that refer to the same timestamp. --- .../AbstractKeyedCEPPatternOperator.java | 152 ++++++++++++------ .../cep/operator/CEPMigration11to13Test.java | 7 +- .../flink/cep/operator/CEPOperatorTest.java | 19 +-- .../src/test/resources/cep-keyed-1_1-snapshot | Bin 5612 -> 5674 bytes .../test/resources/cep-non-keyed-1.1-snapshot | Bin 3274 -> 3336 bytes 5 files changed, 118 insertions(+), 60 deletions(-) diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index 7b6e5e3f33dea..af4b53e59e487 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -19,6 +19,8 @@ package org.apache.flink.cep.operator; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.CompatibilityResult; @@ -29,6 +31,8 @@ import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.core.fs.FSDataInputStream; @@ -54,6 +58,10 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.PriorityQueue; @@ -86,12 +94,11 @@ public abstract class AbstractKeyedCEPPatternOperator /////////////// State ////////////// private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorStateName"; - private static final String PRIORITY_QUEUE_STATE_NAME = "priorityQueueStateName"; + private static final String EVENT_QUEUE_STATE_NAME = "eventQueuesStateName"; private transient ValueState> nfaOperatorState; - private transient ValueState>> priorityQueueOperatorState; + private transient MapState> elementQueueState; - private final PriorityQueueFactory> priorityQueueFactory = new PriorityQueueStreamRecordFactory<>(); private final NFACompiler.NFAFactory nfaFactory; private transient InternalTimerService timerService; @@ -134,19 +141,13 @@ public void initializeState(StateInitializationContext context) throws Exception new NFA.NFASerializer<>(inputSerializer))); } - @SuppressWarnings("unchecked,rawtypes") - TypeSerializer> streamRecordSerializer = - (TypeSerializer) new StreamElementSerializer<>(inputSerializer); - - if (priorityQueueOperatorState == null) { - priorityQueueOperatorState = getRuntimeContext().getState( - new ValueStateDescriptor<>( - PRIORITY_QUEUE_STATE_NAME, - new PriorityQueueSerializer<>( - streamRecordSerializer, - new PriorityQueueStreamRecordFactory() + if (elementQueueState == null) { + elementQueueState = getRuntimeContext().getMapState( + new MapStateDescriptor<>( + EVENT_QUEUE_STATE_NAME, + LongSerializer.INSTANCE, + new ListSerializer<>(inputSerializer) ) - ) ); } } @@ -171,25 +172,32 @@ public void processElement(StreamRecord element) throws Exception { } else { + long timestamp = element.getTimestamp(); + IN value = element.getValue(); + // In event-time processing we assume correctness of the watermark. // Events with timestamp smaller than the last seen watermark are considered late. // Late events are put in a dedicated side output, if the user has specified one. - if (element.getTimestamp() >= lastWatermark) { + if (timestamp >= lastWatermark) { // we have an event with a valid timestamp, so // we buffer it until we receive the proper watermark. saveRegisterWatermarkTimer(); - PriorityQueue> priorityQueue = getPriorityQueue(); + List elementsForTimestamp = elementQueueState.get(timestamp); + if (elementsForTimestamp == null) { + elementsForTimestamp = new ArrayList<>(); + } + if (getExecutionConfig().isObjectReuseEnabled()) { // copy the StreamRecord so that it cannot be changed - priorityQueue.offer(new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp())); + elementsForTimestamp.add(inputSerializer.copy(value)); } else { - priorityQueue.offer(element); + elementsForTimestamp.add(element.getValue()); } - updatePriorityQueue(priorityQueue); + elementQueueState.put(timestamp, elementsForTimestamp); } } } @@ -218,23 +226,28 @@ public void onEventTime(InternalTimer timer) throws Exceptio // 5) update the last seen watermark. // STEP 1 - PriorityQueue> priorityQueue = getPriorityQueue(); + PriorityQueue sortedTimestamps = getSortedTimestamps(); NFA nfa = getNFA(); // STEP 2 - while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= timerService.currentWatermark()) { - StreamRecord streamRecord = priorityQueue.poll(); - processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp()); + while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= timerService.currentWatermark()) { + long timestamp = sortedTimestamps.poll(); + for (IN element: elementQueueState.get(timestamp)) { + processEvent(nfa, element, timestamp); + } + elementQueueState.remove(timestamp); } // STEP 3 advanceTime(nfa, timerService.currentWatermark()); // STEP 4 - updatePriorityQueue(priorityQueue); + if (sortedTimestamps.isEmpty()) { + elementQueueState.clear(); + } updateNFA(nfa); - if (!priorityQueue.isEmpty() || !nfa.isEmpty()) { + if (!sortedTimestamps.isEmpty() || !nfa.isEmpty()) { saveRegisterWatermarkTimer(); } @@ -264,17 +277,12 @@ private void updateNFA(NFA nfa) throws IOException { } } - private PriorityQueue> getPriorityQueue() throws IOException { - PriorityQueue> priorityQueue = priorityQueueOperatorState.value(); - return priorityQueue != null ? priorityQueue : priorityQueueFactory.createPriorityQueue(); - } - - private void updatePriorityQueue(PriorityQueue> queue) throws IOException { - if (queue.isEmpty()) { - priorityQueueOperatorState.clear(); - } else { - priorityQueueOperatorState.update(queue); + private PriorityQueue getSortedTimestamps() throws Exception { + PriorityQueue sortedTimestamps = new PriorityQueue<>(); + for (Long timestamp: elementQueueState.keys()) { + sortedTimestamps.offer(timestamp); } + return sortedTimestamps; } /** @@ -318,6 +326,18 @@ public void restoreState(FSDataInputStream in) throws Exception { ValueState> oldNfaOperatorState = getRuntimeContext().getState( new ValueStateDescriptor<>("nfaOperatorState", new NFA.Serializer())); + ValueState>> oldPriorityQueueOperatorState = + getRuntimeContext().getState( + new ValueStateDescriptor<>( + "priorityQueueStateName", + new PriorityQueueSerializer<>( + ((TypeSerializer) new StreamElementSerializer<>(inputSerializer)), + new PriorityQueueStreamRecordFactory() + ) + ) + ); + + if (migratingFromOldKeyedOperator) { int numberEntries = inputView.readInt(); for (int i = 0; i < numberEntries; i++) { @@ -328,6 +348,30 @@ public void restoreState(FSDataInputStream in) throws Exception { NFA nfa = oldNfaOperatorState.value(); oldNfaOperatorState.clear(); nfaOperatorState.update(nfa); + + PriorityQueue> priorityQueue = oldPriorityQueueOperatorState.value(); + if (priorityQueue != null && !priorityQueue.isEmpty()) { + Map> elementMap = new HashMap<>(); + for (StreamRecord record: priorityQueue) { + long timestamp = record.getTimestamp(); + IN element = record.getValue(); + + List elements = elementMap.get(timestamp); + if (elements == null) { + elements = new ArrayList<>(); + elementMap.put(timestamp, elements); + } + elements.add(element); + } + + // write the old state into the new one. + for (Map.Entry> entry: elementMap.entrySet()) { + elementQueueState.put(entry.getKey(), entry.getValue()); + } + + // clear the old state + oldPriorityQueueOperatorState.clear(); + } } } else { @@ -339,22 +383,35 @@ public void restoreState(FSDataInputStream in) throws Exception { // retrieve the elements that were pending in the priority queue MultiplexingStreamRecordSerializer recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer); - PriorityQueue> priorityQueue = priorityQueueFactory.createPriorityQueue(); + + Map> elementMap = new HashMap<>(); int entries = ois.readInt(); for (int i = 0; i < entries; i++) { StreamElement streamElement = recordSerializer.deserialize(inputView); - priorityQueue.offer(streamElement.asRecord()); + StreamRecord record = streamElement.asRecord(); + + long timestamp = record.getTimestamp(); + IN element = record.getValue(); + + List elements = elementMap.get(timestamp); + if (elements == null) { + elements = new ArrayList<>(); + elementMap.put(timestamp, elements); + } + elements.add(element); } // finally register the retrieved state with the new keyed state. setCurrentKey((byte) 0); nfaOperatorState.update(nfa); - priorityQueueOperatorState.update(priorityQueue); + + // write the priority queue to the new map state. + for (Map.Entry> entry: elementMap.entrySet()) { + elementQueueState.put(entry.getKey(), entry.getValue()); + } if (!isProcessingTime) { // this is relevant only for event/ingestion time - - // need to work around type restrictions setCurrentKey((byte) 0); saveRegisterWatermarkTimer(); } @@ -546,15 +603,18 @@ public boolean hasNonEmptyNFA(KEY key) throws IOException { } @VisibleForTesting - public boolean hasNonEmptyPQ(KEY key) throws IOException { + public boolean hasNonEmptyPQ(KEY key) throws Exception { setCurrentKey(key); - return priorityQueueOperatorState.value() != null; + return elementQueueState.keys().iterator().hasNext(); } @VisibleForTesting - public int getPQSize(KEY key) throws IOException { + public int getPQSize(KEY key) throws Exception { setCurrentKey(key); - PriorityQueue> pq = getPriorityQueue(); - return pq == null ? -1 : pq.size(); + int counter = 0; + for (List elements: elementQueueState.values()) { + counter += elements.size(); + } + return counter; } } diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java index 255b8c21aec49..d575e4309941f 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java @@ -87,6 +87,9 @@ public Integer getKey(Event value) throws Exception { harness.processElement(new StreamRecord(new Event(42, "foobar", 1.0), 2)); harness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); harness.processWatermark(new Watermark(2)); + + harness.processElement(new StreamRecord(middleEvent, 3)); + // simulate snapshot/restore with empty element queue but NFA state StreamTaskState snapshot = harness.snapshot(1, 1); FileOutputStream out = new FileOutputStream( @@ -112,7 +115,6 @@ public Integer getKey(Event value) throws Exception { harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-1_1-snapshot")); harness.open(); - harness.processElement(new StreamRecord(middleEvent, 3)); harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4)); harness.processElement(new StreamRecord<>(endEvent, 5)); @@ -206,6 +208,8 @@ public void testNonKeyedCEPFunctionMigration() throws Exception { harness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); harness.processWatermark(new Watermark(2)); + harness.processElement(new StreamRecord(middleEvent, 3)); + // simulate snapshot/restore with empty element queue but NFA state StreamTaskState snapshot = harness.snapshot(1, 1); FileOutputStream out = new FileOutputStream( @@ -233,7 +237,6 @@ public void testNonKeyedCEPFunctionMigration() throws Exception { harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-non-keyed-1.1-snapshot")); harness.open(); - harness.processElement(new StreamRecord(middleEvent, 3)); harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4)); harness.processElement(new StreamRecord<>(endEvent, 5)); diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index 95e3a37fa2d49..ab63479595139 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -43,7 +43,6 @@ import org.apache.flink.types.Either; import org.apache.flink.util.TestLogger; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -140,8 +139,6 @@ public void testKeyedCEPOperatorCheckpointing() throws Exception { } @Test - @Ignore - // TODO: 5/19/17 Re-instate when checkpoints are fixed public void testKeyedCEPOperatorCheckpointingWithRocksDB() throws Exception { String rocksDbPath = tempFolder.newFolder().getAbsolutePath(); @@ -306,11 +303,11 @@ public void testCEPOperatorCleanupEventTime() throws Exception { harness.processWatermark(new Watermark(Long.MIN_VALUE)); - harness.processElement(new StreamRecord<>(startEvent1, 1L)); - harness.processElement(new StreamRecord<>(startEventK2, 1L)); harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L)); harness.processElement(new StreamRecord(middleEvent1, 2L)); harness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0, 5.0), 3L)); + harness.processElement(new StreamRecord<>(startEvent1, 1L)); + harness.processElement(new StreamRecord<>(startEventK2, 1L)); // there must be 2 keys 42, 43 registered for the watermark callback // all the seen elements must be in the priority queues but no NFA yet. @@ -404,13 +401,13 @@ public void testCEPOperatorCleanupEventTimeWithSameElements() throws Exception { harness.processWatermark(new Watermark(Long.MIN_VALUE)); + harness.processElement(new StreamRecord<>(middle2Event1, 6)); + harness.processElement(new StreamRecord<>(middle1Event3, 7)); harness.processElement(new StreamRecord<>(startEvent, 1)); harness.processElement(new StreamRecord<>(middle1Event1, 3)); - harness.processElement(new StreamRecord<>(middle1Event1, 3)); // this and the following get reordered harness.processElement(new StreamRecord<>(middle1Event2, 3)); + harness.processElement(new StreamRecord<>(middle1Event1, 3)); harness.processElement(new StreamRecord<>(new Event(41, "d", 6.0), 5)); - harness.processElement(new StreamRecord<>(middle2Event1, 6)); - harness.processElement(new StreamRecord<>(middle1Event3, 7)); assertEquals(1L, harness.numEventTimeTimers()); assertEquals(7L, operator.getPQSize(41)); @@ -554,8 +551,6 @@ public void testCEPOperatorCleanupProcessingTime() throws Exception { } @Test - @Ignore - // TODO: 5/19/17 Re-instate when checkpoints are fixed public void testCEPOperatorSerializationWRocksDB() throws Exception { String rocksDbPath = tempFolder.newFolder().getAbsolutePath(); RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); @@ -626,13 +621,13 @@ public NFA createNFA() { harness.processElement(new StreamRecord<>(startEvent1, 1)); harness.processElement(new StreamRecord(middleEvent1, 2)); harness.processWatermark(2L); + harness.processElement(new StreamRecord(middleEvent3, 5)); harness.processElement(new StreamRecord(middleEvent2, 3)); harness.processElement(new StreamRecord<>(startEvent2, 4)); - harness.processElement(new StreamRecord(middleEvent3, 5)); harness.processWatermark(5L); - harness.processElement(new StreamRecord(middleEvent4, 5)); harness.processElement(new StreamRecord<>(nextOne, 6)); harness.processElement(new StreamRecord<>(endEvent, 8)); + harness.processElement(new StreamRecord(middleEvent4, 5)); harness.processWatermark(100L); List> resultingPatterns = new ArrayList<>(); diff --git a/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot index 277de1da359cd97932aa788d75585f21b9e9df85..75655c65c6dce6aa868a18142e9a9d1f4c55b48a 100644 GIT binary patch delta 70 zcmaE(y-H_;GcUJEAwwO56^M}E?8m#4NoWEC0|O%vg9$Al%`{n3z!8YGdDHUW+kXH_ OF)%o&Yz`5a!3qF1A`bZg delta 56 zcmZ3b^G17vGcUJ%AwwO56^Jm|?8m#4NhpYcfq@Z-!Gsowo*X5hJ=sM-Z1W6(Nvr^L C6bsb= diff --git a/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot index b5ca51e04163762c025005e3f82af5acbd200426..68ca0ec86d8b8f2dac04236878fd7c855d84d5a5 100644 GIT binary patch delta 76 zcmX>l*&(%|oRM?FnzszB#YGGIwiTGoRKqV&07Z6;vxo_$qzY1CkL>zY@W$j!No0K$WX^%1tLr)%kbndGEP3l Kqdj>Gj~D>=^bnE&