Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-7475] [core][DataStream API] support update() in ListState #4963

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/dev/stream/state/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ for each key that the operation sees). The value can be set using `update(T)` an

* `ListState<T>`: This keeps a list of elements. You can append elements and retrieve an `Iterable`
over all currently stored elements. Elements are added using `add(T)`, the Iterable can
be retrieved using `Iterable<T> get()`.
be retrieved using `Iterable<T> get()`. You can also override the existing list with `update(List<T>)`

* `ReducingState<T>`: This keeps a single value that represents the aggregation of all values
added to the state. The interface is the same as for `ListState` but elements added using
added to the state. The interface is similar to `ListState` but elements added using
`add(T)` are reduced to an aggregate using a specified `ReduceFunction`.

* `AggregatingState<IN, OUT>`: This keeps a single value that represents the aggregation of all values
Expand All @@ -108,7 +108,7 @@ added using `add(IN)` are aggregated using a specified `AggregateFunction`.

* `FoldingState<T, ACC>`: This keeps a single value that represents the aggregation of all values
added to the state. Contrary to `ReducingState`, the aggregate type may be different from the type
of elements that are added to the state. The interface is the same as for `ListState` but elements
of elements that are added to the state. The interface is similar to `ListState` but elements
added using `add(T)` are folded into an aggregate using a specified `FoldFunction`.

* `MapState<UK, UV>`: This keeps a list of mappings. You can put key-value pairs into the state and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -751,5 +751,14 @@ public List<T> getList() {
public boolean isClearCalled() {
return clearCalled;
}

@Override
public void update(List<T> values) throws Exception {
clear();

if (values != null && !values.isEmpty()) {
list.addAll(values);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand Down Expand Up @@ -138,12 +137,12 @@ public void testUseRestoredStateForSnapshotIfFetcherNotInitialized() throws Exce
// arbitrary checkpoint id and timestamp
consumer.snapshotState(new StateSnapshotContextSynchronousImpl(123, 123));

Assert.assertTrue(listState.isClearCalled());
assertTrue(listState.isClearCalled());

// the checkpointed list state should contain only the shards that it should subscribe to
Assert.assertEquals(globalUnionState.size() / 2, listState.getList().size());
Assert.assertTrue(listState.getList().contains(globalUnionState.get(0)));
Assert.assertTrue(listState.getList().contains(globalUnionState.get(2)));
assertEquals(globalUnionState.size() / 2, listState.getList().size());
assertTrue(listState.getList().contains(globalUnionState.get(0)));
assertTrue(listState.getList().contains(globalUnionState.get(2)));
}

@Test
Expand Down Expand Up @@ -544,6 +543,15 @@ public List<T> getList() {
public boolean isClearCalled() {
return clearCalled;
}

@Override
public void update(List<T> values) throws Exception {
list.clear();

if (values != null || !values.isEmpty()) {
list.addAll(values);
}
}
}

private HashMap<StreamShardHandle, SequenceNumber> getFakeRestoredStore(String streamName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.state.util.MergeUtils;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.internal.InternalListState;
Expand Down Expand Up @@ -114,7 +115,6 @@ public void add(V value) throws IOException {
DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
valueSerializer.serialize(value, out);
backend.db.merge(columnFamily, writeOptions, key, keySerializationStream.toByteArray());

} catch (Exception e) {
throw new RuntimeException("Error while adding data to RocksDB", e);
}
Expand Down Expand Up @@ -158,4 +158,33 @@ public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
throw new Exception("Error while merging state in RocksDB", e);
}
}

@Override
public void update(List<V> values) throws Exception {
clear();

if (values != null && !values.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it expected behaviour that adding an empty list will not result in an entry? Maybe that is correct, but we should double check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's the behavior in rocksdb. Good to have a second eye on it https://github.com/facebook/rocksdb/blob/master/utilities/merge_operators/string_append/stringappend2.cc since I never wrote c++ before

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant, is not a RocksDB question, but a question if the list state should return null or empty iterator if after the user stored an empty list under a key. That might make a difference and right now, I the caller updates with empty list, the get is identical to a non-existing mapping (null). Returning an empty list would require to make an entry in rocksdb.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in that case, for all state backends, it returns null. I will add a unit test to verify and enforce it.

try {
writeCurrentKeyWithGroupAndNamespace();
byte[] key = keySerializationStream.toByteArray();
DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);

List<byte[]> bytes = new ArrayList<>(values.size());
for (V value : values) {
keySerializationStream.reset();
valueSerializer.serialize(value, out);
bytes.add(keySerializationStream.toByteArray());
}

byte[] premerge = MergeUtils.merge(bytes);
if (premerge != null) {
backend.db.put(columnFamily, writeOptions, key, premerge);
} else {
throw new IOException("Failed pre-merge values");
}
} catch (IOException | RocksDBException e) {
throw new RuntimeException("Error while updating data to RocksDB", e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.contrib.streaming.state.util;

import org.apache.flink.annotation.VisibleForTesting;

import java.util.List;

/**
* Utils to simulate StringAppendTestOperator's merge operations in RocksDB.
*/
public class MergeUtils {
@VisibleForTesting
protected static final byte DELIMITER = ',';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if we could initialize this from the RocksDB API somehow. But not sure if this is possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RocksDB doesn't expose that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that is unfortunate.


/**
* Merge operands into a single value that can be put directly into RocksDB.
*/
public static byte[] merge(List<byte[]> operands) {
if (operands == null || operands.size() == 0) {
return null;
}

if (operands.size() == 1) {
return operands.get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will fail if size == 0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...which, in turn, is a corner case that the test should cover.

}

int numBytes = 0;
for (byte[] arr : operands) {
numBytes += arr.length + 1;
}
numBytes--;

byte[] result = new byte[numBytes];

System.arraycopy(operands.get(0), 0, result, 0, operands.get(0).length);

for (int i = 1, arrIndex = operands.get(0).length; i < operands.size(); i++) {
result[arrIndex] = DELIMITER;
arrIndex += 1;
System.arraycopy(operands.get(i), 0, result, arrIndex, operands.get(i).length);
arrIndex += operands.get(i).length;
}

return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.contrib.streaming.state.benchmark;

import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.util.MergeUtils;
import org.apache.flink.testutils.junit.RetryOnFailure;
import org.apache.flink.testutils.junit.RetryRule;
import org.apache.flink.util.TestLogger;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.rocksdb.CompactionStyle;
import org.rocksdb.NativeLibraryLoader;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.WriteOptions;

import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

/**
* Test that validates that the performance of APIs of RocksDB's ListState is as expected.
*
* Benchmarking
* Computer: MacbookPro (Mid 2015), Flash Storage, Processor 2.5GHz Intel Core i7, Memory 16GB 1600MHz DDR3
*
* Number of values added | time for add() | time for update() | perf improvement of update() over add()
* 500 978703 ns 55397 ns 17.66x
* 1000 3044179 ns 89474 ns 34.02x
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why the formatting is a few spaces behind others on github. They are aligned on my computer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that's an improvement that would justify the more complex approach. Nice!

* 5000 9247395 ns 305580 ns 30.26x
* 10000 16416442 ns 605963 ns 27.09x
* 50000 84311205 ns 5691288 ns 14.81x
* 100000 195103310 ns 12914182 ns 15.11x
* 500000 1223141510 ns 70595881 ns 17.33x
*
* In summary, update() API which pre-merges all values gives users 15-35x performance improvements.
* For most frequent use cases where there are a few hundreds to a few thousands values per key,
* users can get 30x - 35x performance improvement!
*
*/
public class RocksDBListStatePerformanceTest extends TestLogger {

@Rule
public final TemporaryFolder tmp = new TemporaryFolder();

@Rule
public final RetryRule retry = new RetryRule();

@Test(timeout = 2000)
@RetryOnFailure(times = 3)
public void testRocksDbListStateAPIs() throws Exception {
final File rocksDir = tmp.newFolder();

// ensure the RocksDB library is loaded to a distinct location each retry
NativeLibraryLoader.getInstance().loadLibrary(rocksDir.getAbsolutePath());

final String key1 = "key1";
final String key2 = "key2";
final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";

final byte[] keyBytes1 = key1.getBytes(StandardCharsets.UTF_8);
final byte[] keyBytes2 = key2.getBytes(StandardCharsets.UTF_8);
final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);

// The number of values added to ListState. Can be changed for benchmarking
final int num = 1000;

try (
final Options options = new Options()
.setCompactionStyle(CompactionStyle.LEVEL)
.setLevelCompactionDynamicLevelBytes(true)
.setIncreaseParallelism(4)
.setUseFsync(false)
.setMaxOpenFiles(-1)
.setCreateIfMissing(true)
.setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME);

final WriteOptions writeOptions = new WriteOptions()
.setSync(false)
.setDisableWAL(true);

final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath())) {

// ----- add() API -----
log.info("begin add");
System.out.println("begin add");

final long beginInsert1 = System.nanoTime();
for (int i = 0; i < num; i++) {
rocksDB.merge(writeOptions, keyBytes1, valueBytes);
}
final long endInsert1 = System.nanoTime();

log.info("end add - duration: {} ns", (endInsert1 - beginInsert1));

// ----- update() API -----

List<byte[]> list = new ArrayList<>(num);
for (int i = 0; i < num; i++) {
list.add(valueBytes);
}
byte[] premerged = MergeUtils.merge(list);

log.info("begin update");

final long beginInsert2 = System.nanoTime();
rocksDB.merge(writeOptions, keyBytes2, premerged);
final long endInsert2 = System.nanoTime();

log.info("end update - duration: {} ns", (endInsert2 - beginInsert2));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.contrib.streaming.state.util;

import org.junit.Test;

import java.util.Arrays;
import java.util.List;

import static org.junit.Assert.assertTrue;

/**
* Tests for MergeUtils.
*/
public class MergeUtilsTest {
@Test
public void testMerge() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should cover more corner cases, like empty list, singleton list, ...

List<byte[]> list = Arrays.asList(
new byte[4],
new byte[1],
new byte[2]);

byte[] expected = new byte[9];
expected[4] = MergeUtils.DELIMITER;
expected[6] = MergeUtils.DELIMITER;

assertTrue(Arrays.equals(expected, MergeUtils.merge(list)));

// Empty list
list = Arrays.asList();

assertTrue(Arrays.equals(null, MergeUtils.merge(list)));

// Singleton list
list = Arrays.asList(new byte[1]);

assertTrue(Arrays.equals(new byte[1], MergeUtils.merge(list)));
}
}