Skip to content

Commit

Permalink
[FLINK-8365] Relax List type in HeapListState and HeapKeyedStateBackend
Browse files Browse the repository at this point in the history
This closes #5326.
  • Loading branch information
bowenli86 authored and StefanRRichter committed Jan 23, 2018
1 parent ce25688 commit e075da5
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 38 deletions.
Expand Up @@ -40,7 +40,6 @@
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.ArrayListSerializer;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.HashMapSerializer;
Expand Down Expand Up @@ -240,16 +239,7 @@ public <N, T> InternalListState<N, T> createListState(
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<T> stateDesc) throws Exception {

// the list state does some manual mapping, because the state is typed to the generic
// 'List' interface, but we want to use an implementation typed to ArrayList
// using a more specialized implementation opens up runtime optimizations

StateTable<K, N, ArrayList<T>> stateTable = tryRegisterStateTable(
stateDesc.getName(),
stateDesc.getType(),
namespaceSerializer,
new ArrayListSerializer<T>(stateDesc.getElementSerializer()));

StateTable<K, N, List<T>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
return new HeapListState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
}

Expand Down
Expand Up @@ -38,7 +38,7 @@
* @param <V> The type of the value.
*/
public class HeapListState<K, N, V>
extends AbstractHeapMergingState<K, N, V, Iterable<V>, ArrayList<V>, ListState<V>, ListStateDescriptor<V>>
extends AbstractHeapMergingState<K, N, V, Iterable<V>, List<V>, ListState<V>, ListStateDescriptor<V>>
implements InternalListState<N, V> {

/**
Expand All @@ -50,7 +50,7 @@ public class HeapListState<K, N, V>
*/
public HeapListState(
ListStateDescriptor<V> stateDesc,
StateTable<K, N, ArrayList<V>> stateTable,
StateTable<K, N, List<V>> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer) {
super(stateDesc, stateTable, keySerializer, namespaceSerializer);
Expand All @@ -74,8 +74,8 @@ public void add(V value) {
return;
}

final StateTable<K, N, ArrayList<V>> map = stateTable;
ArrayList<V> list = map.get(namespace);
final StateTable<K, N, List<V>> map = stateTable;
List<V> list = map.get(namespace);

if (list == null) {
list = new ArrayList<>();
Expand All @@ -89,7 +89,7 @@ public byte[] getSerializedValue(K key, N namespace) throws Exception {
Preconditions.checkState(namespace != null, "No namespace given.");
Preconditions.checkState(key != null, "No key given.");

ArrayList<V> result = stateTable.get(key, namespace);
List<V> result = stateTable.get(key, namespace);

if (result == null) {
return null;
Expand Down Expand Up @@ -117,38 +117,31 @@ public byte[] getSerializedValue(K key, N namespace) throws Exception {
// ------------------------------------------------------------------------

@Override
protected ArrayList<V> mergeState(ArrayList<V> a, ArrayList<V> b) {
protected List<V> mergeState(List<V> a, List<V> b) {
a.addAll(b);
return a;
}

@Override
public void update(List<V> values) throws Exception {
clear();

if (values != null && !values.isEmpty()) {
final N namespace = currentNamespace;
final StateTable<K, N, ArrayList<V>> map = stateTable;

map.put(namespace, new ArrayList<>(values));
stateTable.put(currentNamespace, new ArrayList<>(values));
} else {
clear();
}
}

@Override
public void addAll(List<V> values) throws Exception {
if (values != null && !values.isEmpty()) {
final N namespace = currentNamespace;
final StateTable<K, N, ArrayList<V>> map = stateTable;

ArrayList<V> list = map.get(currentNamespace);

if (list == null) {
list = new ArrayList<>();
}

list.addAll(values);

map.put(namespace, list);
stateTable.transform(currentNamespace, values, (previousState, value) -> {
if (previousState != null) {
previousState.addAll(value);
return previousState;
} else {
return new ArrayList<>(value);
}
});
}
}
}
}
Expand Up @@ -1308,7 +1308,7 @@ public void testListStateAPIs() throws Exception {
state.update(null);
assertNull(state.get());
// update(emptyList) should remain the value null
state.update(Arrays.asList());
state.update(Collections.emptyList());
assertNull(state.get());
state.update(Arrays.asList(10L, 16L));
assertThat(state.get(), containsInAnyOrder(16L, 10L));
Expand All @@ -1320,7 +1320,7 @@ public void testListStateAPIs() throws Exception {
assertNull(state.get());
state.addAll(null);
assertNull(state.get());
state.addAll(new ArrayList<>());
state.addAll(Collections.emptyList());
assertNull(state.get());
state.addAll(Arrays.asList(3L, 4L));
assertThat(state.get(), containsInAnyOrder(3L, 4L));
Expand Down

0 comments on commit e075da5

Please sign in to comment.