Skip to content

Commit

Permalink
[FLINK-9418] Migrate SharedBuffer to use MapState
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys committed Jun 13, 2018
1 parent 45d5442 commit 9218df8
Show file tree
Hide file tree
Showing 42 changed files with 3,126 additions and 2,465 deletions.
Expand Up @@ -18,78 +18,51 @@

package org.apache.flink.cep.nfa;

import org.apache.flink.cep.nfa.sharedbuffer.NodeId;

import javax.annotation.Nullable;

import java.util.Objects;

/**
* Helper class which encapsulates the state of the NFA computation. It points to the current state,
* the last taken event, its occurrence timestamp, the current version and the starting timestamp
* Helper class which encapsulates the currentStateName of the NFA computation. It points to the current currentStateName,
* the previous entry of the pattern, the current version and the starting timestamp
* of the overall pattern.
*
* @param <T> Type of the input events
*/
public class ComputationState<T> {
// pointer to the NFA state of the computation
private final String state;

// the last taken event
private final T event;

private final int counter;
public class ComputationState {
// pointer to the NFA currentStateName of the computation
private final String currentStateName;

// timestamp of the last taken event
private final long timestamp;

// The current version of the state to discriminate the valid pattern paths in the SharedBuffer
// The current version of the currentStateName to discriminate the valid pattern paths in the SharedBuffer
private final DeweyNumber version;

// Timestamp of the first element in the pattern
private final long startTimestamp;

@Nullable
private final String previousState;
private final NodeId previousBufferEntry;

private ComputationState(
final String currentState,
@Nullable final String previousState,
final T event,
final int counter,
final long timestamp,
@Nullable final NodeId previousBufferEntry,
final DeweyNumber version,
final long startTimestamp) {
this.state = currentState;
this.event = event;
this.counter = counter;
this.timestamp = timestamp;
this.currentStateName = currentState;
this.version = version;
this.startTimestamp = startTimestamp;
this.previousState = previousState;
}

public int getCounter() {
return counter;
this.previousBufferEntry = previousBufferEntry;
}

public long getTimestamp() {
return timestamp;
public NodeId getPreviousBufferEntry() {
return previousBufferEntry;
}

public long getStartTimestamp() {
return startTimestamp;
}

public String getState() {
return state;
}

@Nullable
public String getPreviousState() {
return previousState;
}

public T getEvent() {
return event;
public String getCurrentStateName() {
return currentStateName;
}

public DeweyNumber getVersion() {
Expand All @@ -100,40 +73,43 @@ public DeweyNumber getVersion() {
public boolean equals(Object obj) {
if (obj instanceof ComputationState) {
ComputationState other = (ComputationState) obj;
return Objects.equals(state, other.state) &&
Objects.equals(event, other.event) &&
counter == other.counter &&
timestamp == other.timestamp &&
return Objects.equals(currentStateName, other.currentStateName) &&
Objects.equals(version, other.version) &&
startTimestamp == other.startTimestamp &&
Objects.equals(previousState, other.previousState);

Objects.equals(previousBufferEntry, other.previousBufferEntry);
} else {
return false;
}
}

@Override
public String toString() {
return "ComputationState{" +
"currentStateName='" + currentStateName + '\'' +
", version=" + version +
", startTimestamp=" + startTimestamp +
", previousBufferEntry=" + previousBufferEntry +
'}';
}

@Override
public int hashCode() {
return Objects.hash(state, event, counter, timestamp, version, startTimestamp, previousState);
return Objects.hash(currentStateName, version, startTimestamp, previousBufferEntry);
}

public static <T> ComputationState<T> createStartState(final String state) {
return new ComputationState<>(state, null, null, 0, -1L, new DeweyNumber(1), -1L);
public static ComputationState createStartState(final String state) {
return createStartState(state, new DeweyNumber(1));
}

public static <T> ComputationState<T> createStartState(final String state, final DeweyNumber version) {
return new ComputationState<T>(state, null, null, 0, -1L, version, -1L);
public static ComputationState createStartState(final String state, final DeweyNumber version) {
return createState(state, null, version, -1L);
}

public static <T> ComputationState<T> createState(
public static ComputationState createState(
final String currentState,
final String previousState,
final T event,
final int counter,
final long timestamp,
final NodeId previousEntry,
final DeweyNumber version,
final long startTimestamp) {
return new ComputationState<>(currentState, previousState, event, counter, timestamp, version, startTimestamp);
return new ComputationState(currentState, previousEntry, version, startTimestamp);
}
}
Expand Up @@ -192,6 +192,10 @@ public static class DeweyNumberSerializer extends TypeSerializerSingleton<DeweyN

private final IntSerializer elemSerializer = IntSerializer.INSTANCE;

public static final DeweyNumberSerializer INSTANCE = new DeweyNumberSerializer();

private DeweyNumberSerializer() {}

@Override
public boolean isImmutableType() {
return false;
Expand Down
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cep.nfa;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.EnumSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.util.Preconditions;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.LinkedList;
import java.util.Queue;

/**
* Methods for deserialization of old format NFA.
*/
class MigrationUtils {

/**
* Skips bytes corresponding to serialized states. In flink 1.6+ the states are no longer kept in state.
*/
static <T> void skipSerializedStates(DataInputView in) throws IOException {
TypeSerializer<String> nameSerializer = StringSerializer.INSTANCE;
TypeSerializer<State.StateType> stateTypeSerializer = new EnumSerializer<>(State.StateType.class);
TypeSerializer<StateTransitionAction> actionSerializer = new EnumSerializer<>(StateTransitionAction.class);

final int noOfStates = in.readInt();

for (int i = 0; i < noOfStates; i++) {
nameSerializer.deserialize(in);
stateTypeSerializer.deserialize(in);
}

for (int i = 0; i < noOfStates; i++) {
String srcName = nameSerializer.deserialize(in);

int noOfTransitions = in.readInt();
for (int j = 0; j < noOfTransitions; j++) {
String src = nameSerializer.deserialize(in);
Preconditions.checkState(src.equals(srcName),
"Source Edge names do not match (" + srcName + " - " + src + ").");

nameSerializer.deserialize(in);
actionSerializer.deserialize(in);

try {
skipCondition(in);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
}

private static <T> void skipCondition(DataInputView in) throws IOException, ClassNotFoundException {
boolean hasCondition = in.readBoolean();
if (hasCondition) {
int length = in.readInt();

byte[] serCondition = new byte[length];
in.read(serCondition);

ByteArrayInputStream bais = new ByteArrayInputStream(serCondition);
ObjectInputStream ois = new ObjectInputStream(bais);

ois.readObject();
ois.close();
bais.close();
}
}

static <T> Queue<ComputationState> deserializeComputationStates(
org.apache.flink.cep.nfa.SharedBuffer<T> sharedBuffer,
TypeSerializer<T> eventSerializer,
DataInputView source) throws IOException {

Queue<ComputationState> computationStates = new LinkedList<>();
StringSerializer stateNameSerializer = StringSerializer.INSTANCE;
LongSerializer timestampSerializer = LongSerializer.INSTANCE;
DeweyNumber.DeweyNumberSerializer versionSerializer = DeweyNumber.DeweyNumberSerializer.INSTANCE;

int computationStateNo = source.readInt();
for (int i = 0; i < computationStateNo; i++) {
String state = stateNameSerializer.deserialize(source);
String prevState = stateNameSerializer.deserialize(source);
long timestamp = timestampSerializer.deserialize(source);
DeweyNumber version = versionSerializer.deserialize(source);
long startTimestamp = timestampSerializer.deserialize(source);
int counter = source.readInt();

T event = null;
if (source.readBoolean()) {
event = eventSerializer.deserialize(source);
}

NodeId nodeId;
if (prevState != null) {
nodeId = sharedBuffer.getNodeId(prevState, timestamp, counter, event);
} else {
nodeId = null;
}

computationStates.add(ComputationState.createState(state, nodeId, version, startTimestamp));
}
return computationStates;
}

private MigrationUtils() {
}
}

0 comments on commit 9218df8

Please sign in to comment.