Skip to content

Commit

Permalink
[FLINK-6578] [cep] Fix self-loop handling in SharedBuffer.
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed May 17, 2017
1 parent 05ad87f commit 8e4db42
Show file tree
Hide file tree
Showing 10 changed files with 728 additions and 114 deletions.
Expand Up @@ -40,6 +40,8 @@ public class ComputationState<T> {
// the last taken event
private final T event;

private final int counter;

// timestamp of the last taken event
private final long timestamp;

Expand All @@ -58,18 +60,24 @@ private ComputationState(
final State<T> currentState,
final State<T> previousState,
final T event,
final int counter,
final long timestamp,
final DeweyNumber version,
final long startTimestamp) {
this.state = currentState;
this.event = event;
this.counter = counter;
this.timestamp = timestamp;
this.version = version;
this.startTimestamp = startTimestamp;
this.previousState = previousState;
this.conditionContext = new ConditionContext(nfa, this);
}

public int getCounter() {
return counter;
}

public ConditionContext getConditionContext() {
return conditionContext;
}
Expand Down Expand Up @@ -108,23 +116,24 @@ public DeweyNumber getVersion() {

public static <T> ComputationState<T> createStartState(final NFA<T> nfa, final State<T> state) {
Preconditions.checkArgument(state.isStart());
return new ComputationState<>(nfa, state, null, null, -1L, new DeweyNumber(1), -1L);
return new ComputationState<>(nfa, state, null, null, 0, -1L, new DeweyNumber(1), -1L);
}

public static <T> ComputationState<T> createStartState(final NFA<T> nfa, final State<T> state, final DeweyNumber version) {
Preconditions.checkArgument(state.isStart());
return new ComputationState<>(nfa, state, null, null, -1L, version, -1L);
return new ComputationState<>(nfa, state, null, null, 0, -1L, version, -1L);
}

public static <T> ComputationState<T> createState(
final NFA<T> nfa,
final State<T> currentState,
final State<T> previousState,
final T event,
final int counter,
final long timestamp,
final DeweyNumber version,
final long startTimestamp) {
return new ComputationState<>(nfa, currentState, previousState, event, timestamp, version, startTimestamp);
return new ComputationState<>(nfa, currentState, previousState, event, counter, timestamp, version, startTimestamp);
}

public boolean isStopState() {
Expand Down
Expand Up @@ -210,7 +210,8 @@ public Tuple2<Collection<Map<String, List<T>>>, Collection<Tuple2<Map<String, Li
stringSharedBuffer.release(
computationState.getPreviousState().getName(),
computationState.getEvent(),
computationState.getTimestamp());
computationState.getTimestamp(),
computationState.getCounter());

newComputationStates = Collections.emptyList();
} else if (event != null) {
Expand All @@ -219,7 +220,6 @@ public Tuple2<Collection<Map<String, List<T>>>, Collection<Tuple2<Map<String, Li
newComputationStates = Collections.singleton(computationState);
}


//delay adding new computation states in case a stop state is reached and we discard the path.
final Collection<ComputationState<T>> statesToRetain = new ArrayList<>();
//if stop state reached in this path
Expand All @@ -234,14 +234,16 @@ public Tuple2<Collection<Map<String, List<T>>>, Collection<Tuple2<Map<String, Li
stringSharedBuffer.release(
newComputationState.getPreviousState().getName(),
newComputationState.getEvent(),
newComputationState.getTimestamp());
newComputationState.getTimestamp(),
computationState.getCounter());
} else if (newComputationState.isStopState()) {
//reached stop state. release entry for the stop state
shouldDiscardPath = true;
stringSharedBuffer.release(
newComputationState.getPreviousState().getName(),
newComputationState.getEvent(),
newComputationState.getTimestamp());
newComputationState.getTimestamp(),
computationState.getCounter());
} else {
// add new computation state; it will be processed once the next event arrives
statesToRetain.add(newComputationState);
Expand All @@ -255,7 +257,8 @@ public Tuple2<Collection<Map<String, List<T>>>, Collection<Tuple2<Map<String, Li
stringSharedBuffer.release(
state.getPreviousState().getName(),
state.getEvent(),
state.getTimestamp());
state.getTimestamp(),
state.getCounter());
}
} else {
computationStates.addAll(statesToRetain);
Expand Down Expand Up @@ -419,6 +422,7 @@ private Collection<ComputationState<T>> computeNextStates(
edge.getTargetState(),
computationState.getPreviousState(),
computationState.getEvent(),
computationState.getCounter(),
computationState.getTimestamp(),
version,
computationState.getStartTimestamp()
Expand All @@ -437,23 +441,25 @@ private Collection<ComputationState<T>> computeNextStates(
final DeweyNumber nextVersion = new DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit);
takeBranchesToVisit--;

final int counter;
final long startTimestamp;
if (computationState.isStartState()) {
startTimestamp = timestamp;
stringSharedBuffer.put(
counter = stringSharedBuffer.put(
currentState.getName(),
event,
timestamp,
currentVersion);
} else {
startTimestamp = computationState.getStartTimestamp();
stringSharedBuffer.put(
counter = stringSharedBuffer.put(
currentState.getName(),
event,
timestamp,
previousState.getName(),
previousEvent,
computationState.getTimestamp(),
computationState.getCounter(),
currentVersion);
}

Expand All @@ -462,6 +468,7 @@ private Collection<ComputationState<T>> computeNextStates(
nextState,
currentState,
event,
counter,
timestamp,
nextVersion,
startTimestamp);
Expand All @@ -474,6 +481,7 @@ private Collection<ComputationState<T>> computeNextStates(
finalState,
currentState,
event,
counter,
timestamp,
nextVersion,
startTimestamp);
Expand All @@ -497,7 +505,8 @@ private Collection<ComputationState<T>> computeNextStates(
stringSharedBuffer.release(
computationState.getPreviousState().getName(),
computationState.getEvent(),
computationState.getTimestamp());
computationState.getTimestamp(),
computationState.getCounter());
}

return resultingComputationStates;
Expand All @@ -508,13 +517,14 @@ private void addComputationState(
State<T> currentState,
State<T> previousState,
T event,
int counter,
long timestamp,
DeweyNumber version,
long startTimestamp) {
ComputationState<T> computationState = ComputationState.createState(
this, currentState, previousState, event, timestamp, version, startTimestamp);
this, currentState, previousState, event, counter, timestamp, version, startTimestamp);
computationStates.add(computationState);
stringSharedBuffer.lock(previousState.getName(), event, timestamp);
stringSharedBuffer.lock(previousState.getName(), event, timestamp, counter);
}

private State<T> findFinalStateAfterProceed(State<T> state, T event, ComputationState<T> computationState) {
Expand Down Expand Up @@ -603,6 +613,7 @@ Map<String, List<T>> extractCurrentMatches(final ComputationState<T> computation
computationState.getPreviousState().getName(),
computationState.getEvent(),
computationState.getTimestamp(),
computationState.getCounter(),
computationState.getVersion());

// for a given computation state, we cannot have more than one matching patterns.
Expand Down Expand Up @@ -723,6 +734,7 @@ public boolean apply(@Nullable StateTransition<T> input) {
convertedStates.get(currentName),
previousState,
readState.getEvent(),
0,
readState.getTimestamp(),
readState.getVersion(),
readState.getStartTimestamp()
Expand Down Expand Up @@ -790,7 +802,7 @@ private ComputationState<T> readComputationState(ObjectInputStream ois) throws I
event = null;
}

return ComputationState.createState(this, state, previousState, event, timestamp, version, startTimestamp);
return ComputationState.createState(this, state, previousState, event, 0, timestamp, version, startTimestamp);
}

////////////////////// Serialization //////////////////////
Expand Down

0 comments on commit 8e4db42

Please sign in to comment.