Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-7475] [core][DataStream API] support update() in ListState #4963

Closed
wants to merge 19 commits into from
Closed

[FLINK-7475] [core][DataStream API] support update() in ListState #4963

wants to merge 19 commits into from

Conversation

bowenli86
Copy link
Member

@bowenli86 bowenli86 commented Nov 7, 2017

What is the purpose of the change

If users want to update the list, they have to do two steps:

listState.clear() 
for (Element e : myList) { 
    listState.add(e); 
} 

We should enable users to do such actions by providing an API listState.update(myNewList)

Brief change log

  • Added update() API to ListState
  • Added and updated unit/IT tests
  • Updated Flink doc

update() can actually be an API in AppendingState, or at least in MapState. We can consider it later.

Verifying this change

This change added tests and can be verified in testListStateAddUpdateAndGet()

Does this pull request potentially affect one of the following parts:

  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs and JavaDocs)

Performance Benchmarking

Computer: MacbookPro (Mid 2015), Flash Storage, Processor 2.5GHz Intel Core i7, Memory 16GB 1600MHz DDR3

Number of values added | time for add() API | time for update() API | perf improvement of update() over add()

  • 500 978703 ns 55397 ns 17.66x
  • 1000 3044179 ns 89474 ns 34.02x
  • 5000 9247395 ns 305580 ns 30.26x
  • 10000 16416442 ns 605963 ns 27.09x
  • 50000 84311205 ns 5691288 ns 14.81x
  • 100000 195103310 ns 12914182 ns 15.11x
  • 500000 1223141510 ns 70595881 ns 17.33x

In summary, update() API which pre-merges all values gives users 15-35x performance improvements.
For most frequent use cases where there are a few hundreds to a few thousands values per key, users can get 30x - 35x performance improvement!

@bowenli86 bowenli86 changed the title [FLINK-7475] [core][state backend] support update() in ListState [FLINK-7475] [core][DataStream API] support update() in ListState Nov 8, 2017
@yunfan123
Copy link
Contributor

In the rocksdb implement, it still use db.merge to add each element to the rocksdb.
I think we can get the specific MergeOperator class from rocksdb config and merge the list as a string.
So we can only use db.put once to update the list.

@bowenli86
Copy link
Member Author

Hi @yunfan123 Thanks for the feedback, and I agree with you.

Well, here's the problem: Flink seems to be using the StringAppendTestOperator as merge operator from RocksDB, which added a ',' between values. But I didn't find how a java class for StringAppendTestOperator.

Shall we mimic StringAppendTestOperator and concatenate the values with ',' ourselves? Do you have any more suggestions?

@yunfan123
Copy link
Contributor

Flink use StringAppendOperator as merge operator. It used in org.apache.flink.contrib.streaming.state.PredefinedOptions.
It use native java method.
Can we use the native method to merge our list?

@bowenli86
Copy link
Member Author

bowenli86 commented Nov 13, 2017

@yunfan123 sorry I can't find where PredefinedOptions defines it. Isn't Flink using RocksDB's StringAppendTESTOperator in RocksDBKeyedStateBackend? And the StringAppendOperator java class in rocksdbjni doesn't expose any methods for us to use.

Hi @aljoscha , do you have insightful suggestions?

@aljoscha
Copy link
Contributor

I don't know if there's a way of retrieving the class behind the "stringappendtest" merge operator. If not, we should just simulate the behaviour of appending with commas in Java code. @StefanRRichter what do you think?

@StefanRRichter
Copy link
Contributor

@aljoscha IIRC there is no class for StringAppendTESTOperator in the RocksDB Java API. You can only pass a string that is used inside some factory in the native RocksDB code to instantiate the append operator. I would also suggest to simulate it, either "shallow" by adding commas or "deeper" by using a dummy RocksDB instance internally to actually perform the merge and query for the result.

@bowenli86
Copy link
Member Author

bowenli86 commented Nov 14, 2017

@yunfan123 @aljoscha @StefanRRichter I chose the "shallow" simulation. What do you guys think about the new code?

The build failure seems to be because one build profile timed out.

@bowenli86
Copy link
Member Author

@yunfan123 @aljoscha @StefanRRichter feedbacks are appreciated

}

if (operands.size() < 2) {
return operands.get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will fail if size == 0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...which, in turn, is a corner case that the test should cover.

public void update(List<S> values) throws Exception {
internalList.clear();

if (values != null && !values.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think the second check is not required and makes the statement more verbose for no good reason.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it makes a difference as we chatted about empty list case

final N namespace = currentNamespace;
final StateTable<K, N, ArrayList<V>> map = stateTable;

map.put(namespace, new ArrayList<>(values));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to make a defensive copy of the list?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because values is of List and map only is of ArrayList, need to enforce strict subclass

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, I wonder if changing the generic type of the map to List is not the better solution? Maybe @StephanEwen had the intention to enforce a somewhat efficient list implementation here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, I wanted to enforce using ArrayList in the internal state when possible, because users never interacted with the list directly. We don't necessarily have to keep that, could relax it to List to avoid extra copies. That should be a ground rule in all of Flink's runtime code: No extra work to work around current code.

Background: I initially strongly typed to ArrayList to make it clear that we want a compact and efficient list implementation. Because I have seen too many times that LinkedList (which is for most cases so much slower) is used as the default list (I blame this on University education, which talks about how theoretical complexity of LinkedList is lower, but fails to actually take processor architecture into account)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@StephanEwen @StefanRRichter I tried to relax the type but the change scope is much bigger than I thought. I recommend handle that as a separate task. I filed FLINK-8365 and listed a couple issues I found when trying to relax the type.

*/
public class MergeUtilsTest {
@Test
public void testMerge() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should cover more corner cases, like empty list, singleton list, ...

*/
public class MergeUtils {
@VisibleForTesting
protected static final byte DELIMITER = ',';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if we could initialize this from the RocksDB API somehow. But not sure if this is possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RocksDB doesn't expose that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that is unfortunate.

public void update(List<V> values) throws Exception {
clear();

if (values != null && !values.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it expected behaviour that adding an empty list will not result in an entry? Maybe that is correct, but we should double check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's the behavior in rocksdb. Good to have a second eye on it https://github.com/facebook/rocksdb/blob/master/utilities/merge_operators/string_append/stringappend2.cc since I never wrote c++ before

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant, is not a RocksDB question, but a question if the list state should return null or empty iterator if after the user stored an empty list under a key. That might make a difference and right now, I the caller updates with empty list, the get is identical to a non-existing mapping (null). Returning an empty list would require to make an entry in rocksdb.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in that case, for all state backends, it returns null. I will add a unit test to verify and enforce it.

bytes.add(keySerializationStream.toByteArray());
}

backend.db.put(columnFamily, writeOptions, key, MergeUtils.merge(bytes));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have some numbers that this custom merging is better than multiple calls to RocksDB merging state? I can see that it is potentially a lot better, but just making sure it is worth the additional complexity.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea. I will do some benchmarking

*
* @throws Exception The method may forward exception thrown internally (by I/O or functions).
*/
void update(List<T> values) throws Exception;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should define and document what happens if this is called with null or empty list. Does this lead to different results for get() ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me document it. They are of the same result - return null when given an empty list

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, if the outcome for both cases should be the same, wouldn't it be nicer to always return the empty iterator and avoid nulls if we can? This might be a small thing but save users from debugging some NPEs :-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@StefanRRichter agree. The correct way to do should be changing ListState#get(). I filed FLINK-8364 and will work on it after this PR is checked in

@bowenli86
Copy link
Member Author

@StefanRRichter I added a perf benchmarking unit test, you can find the perf results in both this PR's description and the comment of that test. The perf improvement is pretty magnificent, between 15x -35x faster.

*
* Number of values added | time for add() | time for update() | perf improvement of update() over add()
* 500 978703 ns 55397 ns 17.66x
* 1000 3044179 ns 89474 ns 34.02x
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why the formatting is a few spaces behind others on github. They are aligned on my computer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that's an improvement that would justify the more complex approach. Nice!

@StefanRRichter
Copy link
Contributor

Alright, I think this looks good to me now. @bowenli86 would you also like to work on the followup issues that you created?

I will merge this today or tomorrow.

@bowenli86
Copy link
Member Author

@StefanRRichter Thanks! I will

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants