From 41fac2400ca411ccce56851424d78802378d93a3 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 25 Jan 2017 16:07:50 +0100 Subject: [PATCH 1/3] [FLINK-5642][query] fix a race condition with HeadListState The idiom behind AppendingState#get() is to return a copy of the value behind or at least not to allow changes to the underlying state storage. However, the heap state backend returns the original list which is backed by an ArrayList which is not thread-safe. Aside from the operator/window evictor thread where only one accesses the state at a time, however, queryable state may access state anytime in order not to slow down normal operation. Any structural changes to ArrayList are thus unsafe and are hereby synchronized in case the state is queryable. --- .../runtime/state/heap/HeapListState.java | 18 +++- .../runtime/state/StateBackendTestBase.java | 91 +++++++++++++++++++ 2 files changed, 108 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java index f0eb53e9decbc..8866d1548f1db 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java @@ -116,7 +116,13 @@ public void add(V value) { list = new ArrayList<>(); keyedMap.put(backend.getCurrentKey(), list); } - list.add(value); + if (stateDesc.isQueryable()) { + synchronized (list) { + list.add(value); + } + } else { + list.add(value); + } } @Override @@ -143,6 +149,16 @@ public byte[] getSerializedValue(K key, N namespace) throws Exception { return null; } + if (stateDesc.isQueryable()) { + synchronized (result) { + return serializeList(result); + } + } else { + return serializeList(result); + } + } + + private byte[] serializeList(final ArrayList result) throws java.io.IOException { TypeSerializer serializer = stateDesc.getSerializer(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index b4bf664cdb3e0..8b2b28b5bec1b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -1120,6 +1120,97 @@ public void testListStateRestoreWithWrongSerializers() { } } + /** + * Tests {@link ListState#add(Object)} and {@link InternalKvState#getSerializedValue(byte[])} + * accessing the state concurrently for queryable state (otherwise the state is not accessed + * concurrently). They should not get in the way of each other. + */ + @Test + public void testListStateRace() throws Exception { + final AbstractKeyedStateBackend backend = + createKeyedBackend(IntSerializer.INSTANCE); + final Integer namespace = 1; + + final ListStateDescriptor kvId = new ListStateDescriptor<>("id", String.class); + kvId.setQueryable("testListStateRace"); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + final TypeSerializer keySerializer = IntSerializer.INSTANCE; + final TypeSerializer namespaceSerializer = IntSerializer.INSTANCE; + final TypeSerializer valueSerializer = kvId.getSerializer(); + + final ListState state = backend + .getPartitionedState(namespace, IntSerializer.INSTANCE, kvId); + + final InternalKvState kvState = (InternalKvState) state; + + // some modifications to the state + final int key = 10; + backend.setCurrentKey(key); + assertNull(state.get()); + assertNull(getSerializedList(kvState, key, keySerializer, + namespace, namespaceSerializer, valueSerializer)); + final String strVal = "1"; + state.add(strVal); + + final CheckedThread writer = new CheckedThread("State writer") { + @Override + public void go() throws Exception { + while (!isInterrupted()) { + // some list state modifications + state.clear(); + state.add(strVal); + Thread.yield(); + } + } + }; + + final CheckedThread serializedGetter = new CheckedThread("Serialized state getter") { + @Override + public void go() throws Exception { + while(!isInterrupted() && writer.isAlive()) { + final List serializedValue = + getSerializedList(kvState, key, keySerializer, + namespace, namespaceSerializer, + valueSerializer); + if (serializedValue != null) { + for (String str : serializedValue) { + assertEquals(strVal, str); + } + } + Thread.yield(); + } + } + }; + + writer.start(); + serializedGetter.start(); + + // run both threads for max 100ms + Timer t = new Timer("stopper"); + t.schedule(new TimerTask() { + @Override + public void run() { + writer.interrupt(); + serializedGetter.interrupt(); + this.cancel(); + } + }, 100); + + // wait for both threads to finish + try { + // serializedGetter will finish if its assertion fails or if writer is not alive any more + serializedGetter.sync(); + // if serializedGetter crashed, writer will not know -> interrupt just in case + writer.interrupt(); + writer.sync(); + t.cancel(); // if not executed yet + } finally { + // clean up + backend.dispose(); + } + } + @Test @SuppressWarnings("unchecked") public void testReducingStateRestoreWithWrongSerializers() { From cdec2d29f5b2cd56689b0c72001028268a542dcd Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 25 Jan 2017 16:04:15 +0100 Subject: [PATCH 2/3] [FLINK-5642] fail when calling Iterator#remove() on queryable list state returned from HeapListState#get() The idiom behind AppendingState#get() is to return a copy of the value behind or at least not to allow changes to the underlying state storage. However, the heap state backend returns the original list and thus is prone to changes. The user cannot rely on changes to be reflected by the backing store but, if correctly used, e.g. by clearing the list and re-adding all elements afterwards, changes may still be ok. However, in conjunction with queryable state, any structural changes to the backing ArrayList lead to races (as may changes to the stored objects but we cannot forbid that for now). By forbidding ArrayList#remove(), we can at least forbid Iterator#remove() which is the only structural change the API offers on the Iterable that HeapListState#get() returns. --- .../state/RocksDBStateBackendTest.java | 7 +++ .../runtime/state/heap/HeapListState.java | 40 ++++++++++++++++- .../runtime/state/StateBackendTestBase.java | 43 +++++++++++++++++++ 3 files changed, 89 insertions(+), 1 deletion(-) diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index dc90666935b60..86ca796f6ca75 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -455,4 +456,10 @@ public boolean accept(File file, String s) { return true; } } + + @Override + @Test + @Ignore("Since RocksDB returns copies, we allow any operation and ignore this test") + public void testListStateIteratorRemove() { + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java index 8866d1548f1db..310962b9d8fb0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java @@ -43,6 +43,40 @@ public class HeapListState extends AbstractHeapMergingState, ArrayList, ListState, ListStateDescriptor> implements InternalListState { + /** + * Private class extending ArrayList which forbids {@link #remove(int)} that is used by {@link + * #iterator()}'s remove function. + *

+ * This is useful for the {@link HeapListState#get()} function that returns an {@link Iterable}. + * By using {@link Iterable#iterator()}, the user may call {@link java.util.Iterator#remove} + * which modifies the list. {@link HeapListState#get()}, however, should only return a copy but + * actually returns on the real value which queryable state reads concurrently. In order not to + * create any races during structural changes, we thus forbid {@link #remove(int)}. + *

+ * Note: we only make the {@link #remove(int)} function unsupported so this is not a + * real immutable arraylist. Also, future changes in {@link ArrayList} are not covered since we + * do not have control over its iterator class. + * + * @param + * list element type + */ + private static class QueryableStateArrayList extends ArrayList { + private static final long serialVersionUID = 1L; + + /** + * Unsupported operation. + * + * @throw UnsupportedOperationException always thrown + * @deprecated unsupported + */ + @Deprecated + @Override + public V remove(final int index) { + throw new UnsupportedOperationException( + "Structural changes to queryable list state are not allowed."); + } + } + /** * Creates a new key/value state for the given hash map of key/value pairs. * @@ -113,7 +147,11 @@ public void add(V value) { ArrayList list = keyedMap.get(backend.getCurrentKey()); if (list == null) { - list = new ArrayList<>(); + if (stateDesc.isQueryable()) { + list = new QueryableStateArrayList<>(); + } else { + list = new ArrayList<>(); + } keyedMap.put(backend.getCurrentKey(), list); } if (stateDesc.isQueryable()) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 8b2b28b5bec1b..daf83d079633f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -55,6 +55,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Random; import java.util.Timer; @@ -1211,6 +1212,48 @@ public void run() { } } + /** + * Tests {@link Iterator#remove()} on the iterator available through {@link ListState#get()}. + * This should fail with an {@link UnsupportedOperationException}. + */ + @Test(expected = UnsupportedOperationException.class) + public void testListStateIteratorRemove() throws Exception { + final AbstractKeyedStateBackend backend = + createKeyedBackend(IntSerializer.INSTANCE); + final Integer namespace = 1; + + final ListStateDescriptor kvId = new ListStateDescriptor<>("id", String.class); + kvId.setQueryable("testListStateRace"); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + final TypeSerializer keySerializer = IntSerializer.INSTANCE; + final TypeSerializer namespaceSerializer = IntSerializer.INSTANCE; + final TypeSerializer valueSerializer = kvId.getSerializer(); + + final ListState state = backend + .getPartitionedState(namespace, IntSerializer.INSTANCE, kvId); + + final InternalKvState kvState = (InternalKvState) state; + + // some modifications to the state + final int key = 10; + backend.setCurrentKey(key); + assertNull(state.get()); + assertNull(getSerializedList(kvState, key, keySerializer, + namespace, namespaceSerializer, valueSerializer)); + state.add("1"); + + // wait for both threads to finish + try { + Iterator iterator = state.get().iterator(); + iterator.next(); + iterator.remove(); + } finally { + // clean up + backend.dispose(); + } + } + @Test @SuppressWarnings("unchecked") public void testReducingStateRestoreWithWrongSerializers() { From 3f7a549c650791dc894e67e8f69f225142954de9 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Fri, 3 Feb 2017 13:28:15 +0100 Subject: [PATCH 3/3] [FLINK-5642] refactor code branching for queryable list state Use a specialised class for queryable list heap state to avoid unnecessary branches in the common case, i.e. non-queryable state. --- .../state/heap/HeapKeyedStateBackend.java | 6 +- .../runtime/state/heap/HeapListState.java | 124 +++++++---------- .../state/heap/QueryableHeapListState.java | 127 ++++++++++++++++++ 3 files changed, 184 insertions(+), 73 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/QueryableHeapListState.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 0fe92e76dfbc5..0398e464bd488 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -161,7 +161,11 @@ public InternalListState createListState( new RegisteredBackendStateMetaInfo<>(stateDesc.getType(), name, namespaceSerializer, new ArrayListSerializer<>(stateDesc.getSerializer())); stateTable = tryRegisterStateTable(stateTable, newMetaInfo); - return new HeapListState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer); + if (stateDesc.isQueryable()) { + return new QueryableHeapListState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer); + } else { + return new HeapListState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer); + } } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java index 310962b9d8fb0..04e27bd0034b8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java @@ -43,40 +43,6 @@ public class HeapListState extends AbstractHeapMergingState, ArrayList, ListState, ListStateDescriptor> implements InternalListState { - /** - * Private class extending ArrayList which forbids {@link #remove(int)} that is used by {@link - * #iterator()}'s remove function. - *

- * This is useful for the {@link HeapListState#get()} function that returns an {@link Iterable}. - * By using {@link Iterable#iterator()}, the user may call {@link java.util.Iterator#remove} - * which modifies the list. {@link HeapListState#get()}, however, should only return a copy but - * actually returns on the real value which queryable state reads concurrently. In order not to - * create any races during structural changes, we thus forbid {@link #remove(int)}. - *

- * Note: we only make the {@link #remove(int)} function unsupported so this is not a - * real immutable arraylist. Also, future changes in {@link ArrayList} are not covered since we - * do not have control over its iterator class. - * - * @param - * list element type - */ - private static class QueryableStateArrayList extends ArrayList { - private static final long serialVersionUID = 1L; - - /** - * Unsupported operation. - * - * @throw UnsupportedOperationException always thrown - * @deprecated unsupported - */ - @Deprecated - @Override - public V remove(final int index) { - throw new UnsupportedOperationException( - "Structural changes to queryable list state are not allowed."); - } - } - /** * Creates a new key/value state for the given hash map of key/value pairs. * @@ -119,18 +85,15 @@ public Iterable get() { return keyedMap.get(backend.getCurrentKey()); } - @Override - public void add(V value) { - Preconditions.checkState(currentNamespace != null, "No namespace set."); - Preconditions.checkState(backend.getCurrentKey() != null, "No key set."); - - if (value == null) { - clear(); - return; - } - + /** + * Retrieves the list state for the current key and namespace creating new objects if the + * requested state does not exist. + * + * @return list state for the current key and namespace + */ + protected final ArrayList creatingGetListState() { Map>> namespaceMap = - stateTable.get(backend.getCurrentKeyGroupIndex()); + stateTable.get(backend.getCurrentKeyGroupIndex()); if (namespaceMap == null) { namespaceMap = createNewMap(); @@ -147,29 +110,27 @@ public void add(V value) { ArrayList list = keyedMap.get(backend.getCurrentKey()); if (list == null) { - if (stateDesc.isQueryable()) { - list = new QueryableStateArrayList<>(); - } else { - list = new ArrayList<>(); - } + list = newList(); keyedMap.put(backend.getCurrentKey(), list); } - if (stateDesc.isQueryable()) { - synchronized (list) { - list.add(value); - } - } else { - list.add(value); - } + return list; } - - @Override - public byte[] getSerializedValue(K key, N namespace) throws Exception { - Preconditions.checkState(namespace != null, "No namespace given."); - Preconditions.checkState(key != null, "No key given."); + + /** + * Retrieves the list state for the given key and namespace without creating new objects if the + * requested state does not exist. + * + * @param key + * key to request + * @param namespace + * namespace of the key to request + * + * @return list state for the given key and namespace pair or null if it does not exist + */ + protected final ArrayList nonCreatingGetListState(final K key, final N namespace) { Map>> namespaceMap = - stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups())); + stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups())); if (namespaceMap == null) { return null; @@ -181,22 +142,41 @@ public byte[] getSerializedValue(K key, N namespace) throws Exception { return null; } - ArrayList result = keyedMap.get(key); + return keyedMap.get(key); + } + + @Override + public void add(V value) { + Preconditions.checkState(currentNamespace != null, "No namespace set."); + Preconditions.checkState(backend.getCurrentKey() != null, "No key set."); + if (value == null) { + clear(); + return; + } + + ArrayList list = creatingGetListState(); + list.add(value); + } + + protected ArrayList newList() { + return new ArrayList<>(); + } + + @Override + public byte[] getSerializedValue(K key, N namespace) throws Exception { + Preconditions.checkState(namespace != null, "No namespace given."); + Preconditions.checkState(key != null, "No key given."); + + ArrayList result = nonCreatingGetListState(key, namespace); if (result == null) { return null; } - if (stateDesc.isQueryable()) { - synchronized (result) { - return serializeList(result); - } - } else { - return serializeList(result); - } + return serializeList(result); } - private byte[] serializeList(final ArrayList result) throws java.io.IOException { + protected final byte[] serializeList(final ArrayList result) throws java.io.IOException { TypeSerializer serializer = stateDesc.getSerializer(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/QueryableHeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/QueryableHeapListState.java new file mode 100644 index 0000000000000..262028006383f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/QueryableHeapListState.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; + +/** + * Heap-backed partitioned {@link org.apache.flink.api.common.state.ListState} that is snapshotted + * into files and supports state queries. + * + * As opposed to {@link HeapListState}, this class avoids some race conditions that occur when + * state queries access state concurrently to operators changing state. + * + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the value. + */ +public class QueryableHeapListState extends HeapListState { + + /** + * Private class extending ArrayList which forbids {@link #remove(int)} that is used by {@link + * #iterator()}'s remove function. + *

+ * This is useful for the {@link HeapListState#get()} function that returns an {@link Iterable}. + * By using {@link Iterable#iterator()}, the user may call {@link java.util.Iterator#remove} + * which modifies the list. {@link HeapListState#get()}, however, should only return a copy but + * actually returns on the real value which queryable state reads concurrently. In order not to + * create any races during structural changes, we thus forbid {@link #remove(int)}. + *

+ * Note: we only make the {@link #remove(int)} function unsupported so this is not a + * real immutable arraylist. Also, future changes in {@link ArrayList} are not covered since we + * do not have control over its iterator class. + * + * @param + * list element type + */ + private static class QueryableStateArrayList extends ArrayList { + private static final long serialVersionUID = 1L; + + /** + * Unsupported operation. + * + * @throw UnsupportedOperationException always thrown + * @deprecated unsupported + */ + @Deprecated + @Override + public V remove(final int index) { + throw new UnsupportedOperationException( + "Structural changes to queryable list state are not allowed."); + } + } + + /** + * Creates a new key/value state for the given hash map of key/value pairs. + * + * @param backend The state backend backing that created this state. + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + * @param stateTable The state tab;e to use in this kev/value state. May contain initial state. + */ + public QueryableHeapListState( + KeyedStateBackend backend, + ListStateDescriptor stateDesc, + StateTable> stateTable, + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer) { + super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer); + } + + @Override + public void add(V value) { + Preconditions.checkState(currentNamespace != null, "No namespace set."); + Preconditions.checkState(backend.getCurrentKey() != null, "No key set."); + + if (value == null) { + clear(); + return; + } + + ArrayList list = creatingGetListState(); + synchronized (list) { + list.add(value); + } + } + + protected ArrayList newList() { + return new QueryableStateArrayList<>(); + } + + @Override + public byte[] getSerializedValue(K key, N namespace) throws Exception { + Preconditions.checkState(namespace != null, "No namespace given."); + Preconditions.checkState(key != null, "No key given."); + + ArrayList result = nonCreatingGetListState(key, namespace); + if (result == null) { + return null; + } + + synchronized (result) { + return serializeList(result); + } + } + +}