Skip to content

Commit

Permalink
[FLINK-6656] [cep] Change element PriorityQueue to MapState.
Browse files Browse the repository at this point in the history
This is to leverage the fact that RocksDB already returns the
keys sorted. So now elements, instead of being stores in a PQ
and all of them being deserialized and serialized at each incoming
element, the are stored in a MapState with the key being the
timestamp and the value, a List of elements that refer to the
same timestamp.
  • Loading branch information
kl0u committed May 23, 2017
1 parent 6f570e7 commit 546e2ad
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 60 deletions.
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.cep.operator; package org.apache.flink.cep.operator;


import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.CompatibilityResult;
Expand All @@ -29,6 +31,8 @@
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataInputStream;
Expand All @@ -54,6 +58,10 @@
import java.io.IOException; import java.io.IOException;
import java.io.ObjectInputStream; import java.io.ObjectInputStream;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.PriorityQueue; import java.util.PriorityQueue;


Expand Down Expand Up @@ -86,12 +94,11 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
/////////////// State ////////////// /////////////// State //////////////


private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorStateName"; private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorStateName";
private static final String PRIORITY_QUEUE_STATE_NAME = "priorityQueueStateName"; private static final String EVENT_QUEUE_STATE_NAME = "eventQueuesStateName";


private transient ValueState<NFA<IN>> nfaOperatorState; private transient ValueState<NFA<IN>> nfaOperatorState;
private transient ValueState<PriorityQueue<StreamRecord<IN>>> priorityQueueOperatorState; private transient MapState<Long, List<IN>> elementQueueState;


private final PriorityQueueFactory<StreamRecord<IN>> priorityQueueFactory = new PriorityQueueStreamRecordFactory<>();
private final NFACompiler.NFAFactory<IN> nfaFactory; private final NFACompiler.NFAFactory<IN> nfaFactory;


private transient InternalTimerService<VoidNamespace> timerService; private transient InternalTimerService<VoidNamespace> timerService;
Expand Down Expand Up @@ -134,19 +141,13 @@ public void initializeState(StateInitializationContext context) throws Exception
new NFA.NFASerializer<>(inputSerializer))); new NFA.NFASerializer<>(inputSerializer)));
} }


@SuppressWarnings("unchecked,rawtypes") if (elementQueueState == null) {
TypeSerializer<StreamRecord<IN>> streamRecordSerializer = elementQueueState = getRuntimeContext().getMapState(
(TypeSerializer) new StreamElementSerializer<>(inputSerializer); new MapStateDescriptor<>(

EVENT_QUEUE_STATE_NAME,
if (priorityQueueOperatorState == null) { LongSerializer.INSTANCE,
priorityQueueOperatorState = getRuntimeContext().getState( new ListSerializer<>(inputSerializer)
new ValueStateDescriptor<>(
PRIORITY_QUEUE_STATE_NAME,
new PriorityQueueSerializer<>(
streamRecordSerializer,
new PriorityQueueStreamRecordFactory<IN>()
) )
)
); );
} }
} }
Expand All @@ -171,25 +172,32 @@ public void processElement(StreamRecord<IN> element) throws Exception {


} else { } else {


long timestamp = element.getTimestamp();
IN value = element.getValue();

// In event-time processing we assume correctness of the watermark. // In event-time processing we assume correctness of the watermark.
// Events with timestamp smaller than the last seen watermark are considered late. // Events with timestamp smaller than the last seen watermark are considered late.
// Late events are put in a dedicated side output, if the user has specified one. // Late events are put in a dedicated side output, if the user has specified one.


if (element.getTimestamp() >= lastWatermark) { if (timestamp >= lastWatermark) {


// we have an event with a valid timestamp, so // we have an event with a valid timestamp, so
// we buffer it until we receive the proper watermark. // we buffer it until we receive the proper watermark.


saveRegisterWatermarkTimer(); saveRegisterWatermarkTimer();


PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue(); List<IN> elementsForTimestamp = elementQueueState.get(timestamp);
if (elementsForTimestamp == null) {
elementsForTimestamp = new ArrayList<>();
}

if (getExecutionConfig().isObjectReuseEnabled()) { if (getExecutionConfig().isObjectReuseEnabled()) {
// copy the StreamRecord so that it cannot be changed // copy the StreamRecord so that it cannot be changed
priorityQueue.offer(new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp())); elementsForTimestamp.add(inputSerializer.copy(value));
} else { } else {
priorityQueue.offer(element); elementsForTimestamp.add(element.getValue());
} }
updatePriorityQueue(priorityQueue); elementQueueState.put(timestamp, elementsForTimestamp);
} }
} }
} }
Expand Down Expand Up @@ -218,23 +226,28 @@ public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exceptio
// 5) update the last seen watermark. // 5) update the last seen watermark.


// STEP 1 // STEP 1
PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue(); PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
NFA<IN> nfa = getNFA(); NFA<IN> nfa = getNFA();


// STEP 2 // STEP 2
while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= timerService.currentWatermark()) { while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= timerService.currentWatermark()) {
StreamRecord<IN> streamRecord = priorityQueue.poll(); long timestamp = sortedTimestamps.poll();
processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp()); for (IN element: elementQueueState.get(timestamp)) {
processEvent(nfa, element, timestamp);
}
elementQueueState.remove(timestamp);
} }


// STEP 3 // STEP 3
advanceTime(nfa, timerService.currentWatermark()); advanceTime(nfa, timerService.currentWatermark());


// STEP 4 // STEP 4
updatePriorityQueue(priorityQueue); if (sortedTimestamps.isEmpty()) {
elementQueueState.clear();
}
updateNFA(nfa); updateNFA(nfa);


if (!priorityQueue.isEmpty() || !nfa.isEmpty()) { if (!sortedTimestamps.isEmpty() || !nfa.isEmpty()) {
saveRegisterWatermarkTimer(); saveRegisterWatermarkTimer();
} }


Expand Down Expand Up @@ -264,17 +277,12 @@ private void updateNFA(NFA<IN> nfa) throws IOException {
} }
} }


private PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException { private PriorityQueue<Long> getSortedTimestamps() throws Exception {
PriorityQueue<StreamRecord<IN>> priorityQueue = priorityQueueOperatorState.value(); PriorityQueue<Long> sortedTimestamps = new PriorityQueue<>();
return priorityQueue != null ? priorityQueue : priorityQueueFactory.createPriorityQueue(); for (Long timestamp: elementQueueState.keys()) {
} sortedTimestamps.offer(timestamp);

private void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) throws IOException {
if (queue.isEmpty()) {
priorityQueueOperatorState.clear();
} else {
priorityQueueOperatorState.update(queue);
} }
return sortedTimestamps;
} }


/** /**
Expand Down Expand Up @@ -318,6 +326,18 @@ public void restoreState(FSDataInputStream in) throws Exception {
ValueState<NFA<IN>> oldNfaOperatorState = getRuntimeContext().getState( ValueState<NFA<IN>> oldNfaOperatorState = getRuntimeContext().getState(
new ValueStateDescriptor<>("nfaOperatorState", new NFA.Serializer<IN>())); new ValueStateDescriptor<>("nfaOperatorState", new NFA.Serializer<IN>()));


ValueState<PriorityQueue<StreamRecord<IN>>> oldPriorityQueueOperatorState =
getRuntimeContext().getState(
new ValueStateDescriptor<>(
"priorityQueueStateName",
new PriorityQueueSerializer<>(
((TypeSerializer) new StreamElementSerializer<>(inputSerializer)),
new PriorityQueueStreamRecordFactory<IN>()
)
)
);


if (migratingFromOldKeyedOperator) { if (migratingFromOldKeyedOperator) {
int numberEntries = inputView.readInt(); int numberEntries = inputView.readInt();
for (int i = 0; i < numberEntries; i++) { for (int i = 0; i < numberEntries; i++) {
Expand All @@ -328,6 +348,30 @@ public void restoreState(FSDataInputStream in) throws Exception {
NFA<IN> nfa = oldNfaOperatorState.value(); NFA<IN> nfa = oldNfaOperatorState.value();
oldNfaOperatorState.clear(); oldNfaOperatorState.clear();
nfaOperatorState.update(nfa); nfaOperatorState.update(nfa);

PriorityQueue<StreamRecord<IN>> priorityQueue = oldPriorityQueueOperatorState.value();
if (priorityQueue != null && !priorityQueue.isEmpty()) {
Map<Long, List<IN>> elementMap = new HashMap<>();
for (StreamRecord<IN> record: priorityQueue) {
long timestamp = record.getTimestamp();
IN element = record.getValue();

List<IN> elements = elementMap.get(timestamp);
if (elements == null) {
elements = new ArrayList<>();
elementMap.put(timestamp, elements);
}
elements.add(element);
}

// write the old state into the new one.
for (Map.Entry<Long, List<IN>> entry: elementMap.entrySet()) {
elementQueueState.put(entry.getKey(), entry.getValue());
}

// clear the old state
oldPriorityQueueOperatorState.clear();
}
} }
} else { } else {


Expand All @@ -339,22 +383,35 @@ public void restoreState(FSDataInputStream in) throws Exception {


// retrieve the elements that were pending in the priority queue // retrieve the elements that were pending in the priority queue
MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer); MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
PriorityQueue<StreamRecord<IN>> priorityQueue = priorityQueueFactory.createPriorityQueue();
Map<Long, List<IN>> elementMap = new HashMap<>();
int entries = ois.readInt(); int entries = ois.readInt();
for (int i = 0; i < entries; i++) { for (int i = 0; i < entries; i++) {
StreamElement streamElement = recordSerializer.deserialize(inputView); StreamElement streamElement = recordSerializer.deserialize(inputView);
priorityQueue.offer(streamElement.<IN>asRecord()); StreamRecord<IN> record = streamElement.<IN>asRecord();

long timestamp = record.getTimestamp();
IN element = record.getValue();

List<IN> elements = elementMap.get(timestamp);
if (elements == null) {
elements = new ArrayList<>();
elementMap.put(timestamp, elements);
}
elements.add(element);
} }


// finally register the retrieved state with the new keyed state. // finally register the retrieved state with the new keyed state.
setCurrentKey((byte) 0); setCurrentKey((byte) 0);
nfaOperatorState.update(nfa); nfaOperatorState.update(nfa);
priorityQueueOperatorState.update(priorityQueue);
// write the priority queue to the new map state.
for (Map.Entry<Long, List<IN>> entry: elementMap.entrySet()) {
elementQueueState.put(entry.getKey(), entry.getValue());
}


if (!isProcessingTime) { if (!isProcessingTime) {
// this is relevant only for event/ingestion time // this is relevant only for event/ingestion time

// need to work around type restrictions
setCurrentKey((byte) 0); setCurrentKey((byte) 0);
saveRegisterWatermarkTimer(); saveRegisterWatermarkTimer();
} }
Expand Down Expand Up @@ -546,15 +603,18 @@ public boolean hasNonEmptyNFA(KEY key) throws IOException {
} }


@VisibleForTesting @VisibleForTesting
public boolean hasNonEmptyPQ(KEY key) throws IOException { public boolean hasNonEmptyPQ(KEY key) throws Exception {
setCurrentKey(key); setCurrentKey(key);
return priorityQueueOperatorState.value() != null; return elementQueueState.keys().iterator().hasNext();
} }


@VisibleForTesting @VisibleForTesting
public int getPQSize(KEY key) throws IOException { public int getPQSize(KEY key) throws Exception {
setCurrentKey(key); setCurrentKey(key);
PriorityQueue<StreamRecord<IN>> pq = getPriorityQueue(); int counter = 0;
return pq == null ? -1 : pq.size(); for (List<IN> elements: elementQueueState.values()) {
counter += elements.size();
}
return counter;
} }
} }
Expand Up @@ -87,6 +87,9 @@ public Integer getKey(Event value) throws Exception {
harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2)); harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
harness.processWatermark(new Watermark(2)); harness.processWatermark(new Watermark(2));
harness.processElement(new StreamRecord<Event>(middleEvent, 3));
// simulate snapshot/restore with empty element queue but NFA state // simulate snapshot/restore with empty element queue but NFA state
StreamTaskState snapshot = harness.snapshot(1, 1); StreamTaskState snapshot = harness.snapshot(1, 1);
FileOutputStream out = new FileOutputStream( FileOutputStream out = new FileOutputStream(
Expand All @@ -112,7 +115,6 @@ public Integer getKey(Event value) throws Exception {
harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-1_1-snapshot")); harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-1_1-snapshot"));
harness.open(); harness.open();


harness.processElement(new StreamRecord<Event>(middleEvent, 3));
harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4)); harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
harness.processElement(new StreamRecord<>(endEvent, 5)); harness.processElement(new StreamRecord<>(endEvent, 5));


Expand Down Expand Up @@ -206,6 +208,8 @@ public void testNonKeyedCEPFunctionMigration() throws Exception {
harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
harness.processWatermark(new Watermark(2)); harness.processWatermark(new Watermark(2));
harness.processElement(new StreamRecord<Event>(middleEvent, 3));
// simulate snapshot/restore with empty element queue but NFA state // simulate snapshot/restore with empty element queue but NFA state
StreamTaskState snapshot = harness.snapshot(1, 1); StreamTaskState snapshot = harness.snapshot(1, 1);
FileOutputStream out = new FileOutputStream( FileOutputStream out = new FileOutputStream(
Expand Down Expand Up @@ -233,7 +237,6 @@ public void testNonKeyedCEPFunctionMigration() throws Exception {
harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-non-keyed-1.1-snapshot")); harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-non-keyed-1.1-snapshot"));
harness.open(); harness.open();


harness.processElement(new StreamRecord<Event>(middleEvent, 3));
harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4)); harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
harness.processElement(new StreamRecord<>(endEvent, 5)); harness.processElement(new StreamRecord<>(endEvent, 5));


Expand Down
Expand Up @@ -43,7 +43,6 @@
import org.apache.flink.types.Either; import org.apache.flink.types.Either;
import org.apache.flink.util.TestLogger; import org.apache.flink.util.TestLogger;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
Expand Down Expand Up @@ -140,8 +139,6 @@ public void testKeyedCEPOperatorCheckpointing() throws Exception {
} }


@Test @Test
@Ignore
// TODO: 5/19/17 Re-instate when checkpoints are fixed
public void testKeyedCEPOperatorCheckpointingWithRocksDB() throws Exception { public void testKeyedCEPOperatorCheckpointingWithRocksDB() throws Exception {


String rocksDbPath = tempFolder.newFolder().getAbsolutePath(); String rocksDbPath = tempFolder.newFolder().getAbsolutePath();
Expand Down Expand Up @@ -306,11 +303,11 @@ public void testCEPOperatorCleanupEventTime() throws Exception {


harness.processWatermark(new Watermark(Long.MIN_VALUE)); harness.processWatermark(new Watermark(Long.MIN_VALUE));


harness.processElement(new StreamRecord<>(startEvent1, 1L));
harness.processElement(new StreamRecord<>(startEventK2, 1L));
harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L)); harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
harness.processElement(new StreamRecord<Event>(middleEvent1, 2L)); harness.processElement(new StreamRecord<Event>(middleEvent1, 2L));
harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L)); harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
harness.processElement(new StreamRecord<>(startEvent1, 1L));
harness.processElement(new StreamRecord<>(startEventK2, 1L));


// there must be 2 keys 42, 43 registered for the watermark callback // there must be 2 keys 42, 43 registered for the watermark callback
// all the seen elements must be in the priority queues but no NFA yet. // all the seen elements must be in the priority queues but no NFA yet.
Expand Down Expand Up @@ -404,13 +401,13 @@ public void testCEPOperatorCleanupEventTimeWithSameElements() throws Exception {


harness.processWatermark(new Watermark(Long.MIN_VALUE)); harness.processWatermark(new Watermark(Long.MIN_VALUE));


harness.processElement(new StreamRecord<>(middle2Event1, 6));
harness.processElement(new StreamRecord<>(middle1Event3, 7));
harness.processElement(new StreamRecord<>(startEvent, 1)); harness.processElement(new StreamRecord<>(startEvent, 1));
harness.processElement(new StreamRecord<>(middle1Event1, 3)); harness.processElement(new StreamRecord<>(middle1Event1, 3));
harness.processElement(new StreamRecord<>(middle1Event1, 3)); // this and the following get reordered
harness.processElement(new StreamRecord<>(middle1Event2, 3)); harness.processElement(new StreamRecord<>(middle1Event2, 3));
harness.processElement(new StreamRecord<>(middle1Event1, 3));
harness.processElement(new StreamRecord<>(new Event(41, "d", 6.0), 5)); harness.processElement(new StreamRecord<>(new Event(41, "d", 6.0), 5));
harness.processElement(new StreamRecord<>(middle2Event1, 6));
harness.processElement(new StreamRecord<>(middle1Event3, 7));


assertEquals(1L, harness.numEventTimeTimers()); assertEquals(1L, harness.numEventTimeTimers());
assertEquals(7L, operator.getPQSize(41)); assertEquals(7L, operator.getPQSize(41));
Expand Down Expand Up @@ -554,8 +551,6 @@ public void testCEPOperatorCleanupProcessingTime() throws Exception {
} }


@Test @Test
@Ignore
// TODO: 5/19/17 Re-instate when checkpoints are fixed
public void testCEPOperatorSerializationWRocksDB() throws Exception { public void testCEPOperatorSerializationWRocksDB() throws Exception {
String rocksDbPath = tempFolder.newFolder().getAbsolutePath(); String rocksDbPath = tempFolder.newFolder().getAbsolutePath();
RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
Expand Down Expand Up @@ -626,13 +621,13 @@ public NFA<Event> createNFA() {
harness.processElement(new StreamRecord<>(startEvent1, 1)); harness.processElement(new StreamRecord<>(startEvent1, 1));
harness.processElement(new StreamRecord<Event>(middleEvent1, 2)); harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
harness.processWatermark(2L); harness.processWatermark(2L);
harness.processElement(new StreamRecord<Event>(middleEvent3, 5));
harness.processElement(new StreamRecord<Event>(middleEvent2, 3)); harness.processElement(new StreamRecord<Event>(middleEvent2, 3));
harness.processElement(new StreamRecord<>(startEvent2, 4)); harness.processElement(new StreamRecord<>(startEvent2, 4));
harness.processElement(new StreamRecord<Event>(middleEvent3, 5));
harness.processWatermark(5L); harness.processWatermark(5L);
harness.processElement(new StreamRecord<Event>(middleEvent4, 5));
harness.processElement(new StreamRecord<>(nextOne, 6)); harness.processElement(new StreamRecord<>(nextOne, 6));
harness.processElement(new StreamRecord<>(endEvent, 8)); harness.processElement(new StreamRecord<>(endEvent, 8));
harness.processElement(new StreamRecord<Event>(middleEvent4, 5));
harness.processWatermark(100L); harness.processWatermark(100L);


List<List<Event>> resultingPatterns = new ArrayList<>(); List<List<Event>> resultingPatterns = new ArrayList<>();
Expand Down
Binary file not shown.
Binary file not shown.

0 comments on commit 546e2ad

Please sign in to comment.