Skip to content

Commit

Permalink
[FLINK-9418] Separated pruning and element processing paths
Browse files Browse the repository at this point in the history
This closes #6059
  • Loading branch information
dawidwys committed Jun 13, 2018
1 parent 9218df8 commit 05ee3ce
Show file tree
Hide file tree
Showing 14 changed files with 246 additions and 151 deletions.
Expand Up @@ -105,8 +105,7 @@ public class NFA<T> {
public NFA(
final Collection<State<T>> validStates,
final long windowTime,
final boolean handleTimeout
) {
final boolean handleTimeout) {
this.windowTime = windowTime;
this.handleTimeout = handleTimeout;
this.states = loadStates(validStates);
Expand Down Expand Up @@ -169,7 +168,6 @@ private boolean isFinalState(ComputationState state) {
return stateObject.isFinal();
}


/**
* Processes the next input event. If some of the computations reach a final state then the
* resulting event sequences are returned. If computations time out and timeout handling is
Expand All @@ -185,8 +183,9 @@ private boolean isFinalState(ComputationState state) {
* @return Tuple of the collection of matched patterns (e.g. the result of computations which have
* reached a final state) and the collection of timed out patterns (if timeout handling is
* activated)
* @throws Exception Thrown if the system cannot access the state.
*/
public Tuple2<Collection<Map<String, List<T>>>, Collection<Tuple2<Map<String, List<T>>, Long>>> process(
public Collection<Map<String, List<T>>> process(
final SharedBuffer<T> sharedBuffer,
final NFAState nfaState,
final T event,
Expand All @@ -210,61 +209,91 @@ public Tuple2<Collection<Map<String, List<T>>>, Collection<Tuple2<Map<String, Li
* @return Tuple of the collection of matched patterns (e.g. the result of computations which have
* reached a final state) and the collection of timed out patterns (if timeout handling is
* activated)
* @throws Exception Thrown if the system cannot access the state.
*/
public Tuple2<Collection<Map<String, List<T>>>, Collection<Tuple2<Map<String, List<T>>, Long>>> process(
public Collection<Map<String, List<T>>> process(
final SharedBuffer<T> sharedBuffer,
final NFAState nfaState,
final T event,
final long timestamp,
final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {

try (EventWrapper eventWrapper = new EventWrapper(event, timestamp, sharedBuffer)) {
return doProcess(sharedBuffer, nfaState, eventWrapper, afterMatchSkipStrategy);
}
}

private Tuple2<Collection<Map<String, List<T>>>, Collection<Tuple2<Map<String, List<T>>, Long>>> doProcess(
/**
* Prunes states assuming there will be no events with timestamp <b>lower</b> than the given one.
* It cleares the sharedBuffer and also emits all timed out partial matches.
*
* @param sharedBuffer the SharedBuffer object that we need to work upon while processing
* @param nfaState The NFAState object that we need to affect while processing
* @param timestamp timestamp that indicates that there will be no more events with lower timestamp
* @return all timed outed partial matches
* @throws Exception Thrown if the system cannot access the state.
*/
public Collection<Tuple2<Map<String, List<T>>, Long>> advanceTime(
final SharedBuffer<T> sharedBuffer,
final NFAState nfaState,
final EventWrapper event,
final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
final long timestamp) throws Exception {

Queue<ComputationState> computationStates = nfaState.getComputationStates();

final int numberComputationStates = computationStates.size();
final Collection<Map<String, List<T>>> result = new ArrayList<>();
final Collection<Tuple2<Map<String, List<T>>, Long>> timeoutResult = new ArrayList<>();

// iterate over all current computations
final int numberComputationStates = computationStates.size();
for (int i = 0; i < numberComputationStates; i++) {
ComputationState computationState = computationStates.poll();

final Collection<ComputationState> newComputationStates;

if (!isStartState(computationState) &&
windowTime > 0L &&
event.getTimestamp() - computationState.getStartTimestamp() >= windowTime) {
if (isStateTimedOut(computationState, timestamp)) {

if (handleTimeout) {
// extract the timed out event pattern
Map<String, List<T>> timedOutPattern = extractCurrentMatches(sharedBuffer, computationState);
timeoutResult.add(Tuple2.of(timedOutPattern, event.getTimestamp()));
timeoutResult.add(Tuple2.of(timedOutPattern, timestamp));
}

sharedBuffer.releaseNode(computationState.getPreviousBufferEntry());

newComputationStates = Collections.emptyList();
nfaState.setStateChanged();
} else if (event.getEvent() != null) {
newComputationStates = computeNextStates(sharedBuffer, computationState, event, event.getTimestamp());

if (newComputationStates.size() != 1) {
nfaState.setStateChanged();
} else if (!newComputationStates.iterator().next().equals(computationState)) {
nfaState.setStateChanged();
}
} else {
newComputationStates = Collections.singleton(computationState);
computationStates.add(computationState);
}
}

sharedBuffer.advanceTime(timestamp);

return timeoutResult;
}

private boolean isStateTimedOut(final ComputationState state, final long timestamp) {
return !isStartState(state) && windowTime > 0L && timestamp - state.getStartTimestamp() >= windowTime;
}

private Collection<Map<String, List<T>>> doProcess(
final SharedBuffer<T> sharedBuffer,
final NFAState nfaState,
final EventWrapper event,
final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {

Queue<ComputationState> computationStates = nfaState.getComputationStates();

final int numberComputationStates = computationStates.size();
final Collection<Map<String, List<T>>> result = new ArrayList<>();

// iterate over all current computations
for (int i = 0; i < numberComputationStates; i++) {
ComputationState computationState = computationStates.poll();

final Collection<ComputationState> newComputationStates = computeNextStates(
sharedBuffer,
computationState,
event,
event.getTimestamp());

if (newComputationStates.size() != 1) {
nfaState.setStateChanged();
} else if (!newComputationStates.iterator().next().equals(computationState)) {
nfaState.setStateChanged();
}

//delay adding new computation states in case a stop state is reached and we discard the path.
Expand Down Expand Up @@ -299,13 +328,12 @@ private Tuple2<Collection<Map<String, List<T>>>, Collection<Tuple2<Map<String, L
} else {
computationStates.addAll(statesToRetain);
}

}

discardComputationStatesAccordingToStrategy(
sharedBuffer, computationStates, result, afterMatchSkipStrategy);

return Tuple2.of(result, timeoutResult);
return result;
}

private void discardComputationStatesAccordingToStrategy(
Expand Down Expand Up @@ -500,6 +528,7 @@ public void close() throws Exception {
* @param event Current event which is processed
* @param timestamp Timestamp of the current event
* @return Collection of computation states which result from the current one
* @throws Exception Thrown if the system cannot access the state.
*/
private Collection<ComputationState> computeNextStates(
final SharedBuffer<T> sharedBuffer,
Expand Down Expand Up @@ -558,22 +587,17 @@ private Collection<ComputationState> computeNextStates(
final DeweyNumber nextVersion = new DeweyNumber(currentVersion).addStage();
takeBranchesToVisit--;

final NodeId newEntry;
final NodeId newEntry = sharedBuffer.put(
currentState.getName(),
event.getEventId(),
previousEntry,
currentVersion);

final long startTimestamp;
if (isStartState(computationState)) {
startTimestamp = timestamp;
newEntry = sharedBuffer.put(
currentState.getName(),
event.getEventId(),
previousEntry,
currentVersion);
} else {
startTimestamp = computationState.getStartTimestamp();
newEntry = sharedBuffer.put(
currentState.getName(),
event.getEventId(),
previousEntry,
currentVersion);
}

addComputationState(
Expand Down Expand Up @@ -631,7 +655,10 @@ private void addComputationState(
sharedBuffer.lockNode(previousEntry);
}

private State<T> findFinalStateAfterProceed(SharedBuffer<T> sharedBuffer, State<T> state, T event,
private State<T> findFinalStateAfterProceed(
SharedBuffer<T> sharedBuffer,
State<T> state,
T event,
ComputationState computationState) {
final Stack<State<T>> statesToCheck = new Stack<>();
statesToCheck.push(state);
Expand Down Expand Up @@ -661,7 +688,9 @@ private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) {
return takeBranches == 0 && ignoreBranches == 0 ? 0 : ignoreBranches + Math.max(1, takeBranches);
}

private OutgoingEdges<T> createDecisionGraph(SharedBuffer<T> sharedBuffer, ComputationState computationState,
private OutgoingEdges<T> createDecisionGraph(
SharedBuffer<T> sharedBuffer,
ComputationState computationState,
T event) {
State<T> state = getState(computationState);
final OutgoingEdges<T> outgoingEdges = new OutgoingEdges<>(state);
Expand Down Expand Up @@ -699,8 +728,11 @@ private OutgoingEdges<T> createDecisionGraph(SharedBuffer<T> sharedBuffer, Compu
return outgoingEdges;
}

private boolean checkFilterCondition(SharedBuffer<T> sharedBuffer, ComputationState computationState,
IterativeCondition<T> condition, T event) throws Exception {
private boolean checkFilterCondition(
SharedBuffer<T> sharedBuffer,
ComputationState computationState,
IterativeCondition<T> condition,
T event) throws Exception {
return condition == null || condition.filter(event, new ConditionContext<>(this, sharedBuffer, computationState));
}

Expand All @@ -712,8 +744,10 @@ private boolean checkFilterCondition(SharedBuffer<T> sharedBuffer, ComputationSt
* @param sharedBuffer The {@link SharedBuffer} from which to extract the matches
* @param computationState The end computation state of the extracted event sequences
* @return Collection of event sequences which end in the given computation state
* @throws Exception Thrown if the system cannot access the state.
*/
private Map<String, List<T>> extractCurrentMatches(final SharedBuffer<T> sharedBuffer,
private Map<String, List<T>> extractCurrentMatches(
final SharedBuffer<T> sharedBuffer,
final ComputationState computationState) throws Exception {
if (computationState.getPreviousBufferEntry() == null) {
return new HashMap<>();
Expand Down Expand Up @@ -819,8 +853,7 @@ public Queue<ComputationState> getComputationStates() {

MigratedNFA(
final Queue<ComputationState> computationStates,
final org.apache.flink.cep.nfa.SharedBuffer<T> sharedBuffer
) {
final org.apache.flink.cep.nfa.SharedBuffer<T> sharedBuffer) {
this.sharedBuffer = sharedBuffer;
this.computationStates = computationStates;
}
Expand All @@ -838,8 +871,8 @@ public static final class NFASerializerConfigSnapshot<T> extends CompositeTypeSe
public NFASerializerConfigSnapshot() {}

public NFASerializerConfigSnapshot(
TypeSerializer<T> eventSerializer,
TypeSerializer<org.apache.flink.cep.nfa.SharedBuffer<T>> sharedBufferSerializer) {
TypeSerializer<T> eventSerializer,
TypeSerializer<org.apache.flink.cep.nfa.SharedBuffer<T>> sharedBufferSerializer) {

super(eventSerializer, sharedBufferSerializer);
}
Expand Down
Expand Up @@ -251,7 +251,7 @@ public SharedBuffer<V> deserialize(DataInputView source) throws IOException {
Map<ValueTimeWrapper<V>, EventId> values = new HashMap<>();
Map<EventId, Lockable<V>> valuesWithIds = new HashMap<>();
Map<Tuple2<String, ValueTimeWrapper<V>>, NodeId> mappingContext = new HashMap<>();
Map<Long, Long> totalEventsPerTimestamp = new HashMap<>();
Map<Long, Integer> totalEventsPerTimestamp = new HashMap<>();
int totalPages = source.readInt();

for (int i = 0; i < totalPages; i++) {
Expand All @@ -263,7 +263,7 @@ public SharedBuffer<V> deserialize(DataInputView source) throws IOException {
ValueTimeWrapper<V> wrapper = ValueTimeWrapper.deserialize(valueSerializer, source);
EventId eventId = values.get(wrapper);
if (eventId == null) {
long id = totalEventsPerTimestamp.computeIfAbsent(wrapper.timestamp, k -> 0L);
int id = totalEventsPerTimestamp.computeIfAbsent(wrapper.timestamp, k -> 0);
eventId = new EventId(id, wrapper.timestamp);
values.put(wrapper, eventId);
valuesWithIds.put(eventId, new Lockable<>(wrapper.value, 1));
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.cep.nfa.sharedbuffer;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
Expand All @@ -31,15 +32,15 @@
* Composite key for events in {@link SharedBuffer}.
*/
public class EventId {
private final long id;
private final int id;
private final long timestamp;

public EventId(long id, long timestamp) {
public EventId(int id, long timestamp) {
this.id = id;
this.timestamp = timestamp;
}

public long getId() {
public int getId() {
return id;
}

Expand Down Expand Up @@ -110,14 +111,14 @@ public int getLength() {

@Override
public void serialize(EventId record, DataOutputView target) throws IOException {
LongSerializer.INSTANCE.serialize(record.id, target);
IntSerializer.INSTANCE.serialize(record.id, target);
LongSerializer.INSTANCE.serialize(record.timestamp, target);
}

@Override
public EventId deserialize(DataInputView source) throws IOException {
Long id = LongSerializer.INSTANCE.deserialize(source);
Long timestamp = LongSerializer.INSTANCE.deserialize(source);
int id = IntSerializer.INSTANCE.deserialize(source);
long timestamp = LongSerializer.INSTANCE.deserialize(source);

return new EventId(id, timestamp);
}
Expand All @@ -129,7 +130,7 @@ public EventId deserialize(EventId reuse, DataInputView source) throws IOExcepti

@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
LongSerializer.INSTANCE.copy(source, target);
IntSerializer.INSTANCE.copy(source, target);
LongSerializer.INSTANCE.copy(source, target);
}

Expand Down
Expand Up @@ -62,10 +62,6 @@ boolean release() {
return refCounter == 0;
}

int getRefCounter() {
return refCounter;
}

public T getElement() {
return element;
}
Expand Down Expand Up @@ -143,7 +139,7 @@ public void serialize(Lockable<E> record, DataOutputView target) throws IOExcept

@Override
public Lockable<E> deserialize(DataInputView source) throws IOException {
Integer refCount = IntSerializer.INSTANCE.deserialize(source);
int refCount = IntSerializer.INSTANCE.deserialize(source);
E record = elementSerializer.deserialize(source);
return new Lockable<>(record, refCount);
}
Expand Down

0 comments on commit 05ee3ce

Please sign in to comment.