Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FLINK-11050 add lowerBound and upperBound for optimizing RocksDBMapState's entries #7226

Closed
wants to merge 1 commit into from

Conversation

Myracle
Copy link
Contributor

@Myracle Myracle commented Dec 4, 2018

(The sections below can be removed for hotfixes of typos)
-->

What is the purpose of the change

This pull request optimizes the seek of RocksDBMapState's entries by assigning lowerBound and upperBound.

Brief change log

  • *Add entries(lowerBound, upperBound) in MapState and implement it in RocksDBMapState.
    -*Use entries(lowerBound, upperBound) instead of entries() in IntervalJoin.java when get buffer's values.

Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (don't know)

Documentation

  • Does this pull request introduce a new feature? (no)

@fhueske
Copy link
Contributor

fhueske commented Dec 4, 2018

Hi @Myracle, thanks for the PR. I think we should either support the new API for all MapState implementations or declare the method as a best-effort filter (which means we have to manually filter the returned entries).

What do you think about this @StefanRRichter?

Best, Fabian

@Myracle
Copy link
Contributor Author

Myracle commented Dec 7, 2018

Hi @Myracle, thanks for the PR. I think we should either support the new API for all MapState implementations or declare the method as a best-effort filter (which means we have to manually filter the returned entries).

What do you think about this @StefanRRichter?

Best, Fabian

Thank you for your reply. The lowerBound and upperBound are used by RocksDB's interface to avoid wasting time on deleted values. The situation is not the same when storing state in heap. Do you still think that we should support the new API for all MapState implementations? Also, I do not understand the meaning of "declare the method as a best-effort filter". Because previous implementation will return all the values and it is just the bottleneck in our situation.

@StefanRRichter
Copy link
Contributor

I think this suggestion is problematic for the following reasons:

  1. Map state is in general unordered
  2. Keys in map state are not required to be Comparable, so what defines the order against which we can compare upper and lower keys?
  3. Even if keys implement, Comparable, their order in RocksDB is depending on the lexicographical order of there bytes in serialized form, which can be a differnt order from what can be defined in the compareTo()
  4. The method is doing different things for RocksDB and Heap right now and this is not properly reflecting in the documentation of the method.

Overall that leads me to the conclusion that we cannot rush to add this optimization but need a bit more careful thinking, e.g. introducing a subclass OrderedMapState (openly or hidden and cast where optimization is required). Even in that case we need to be careful when addressing the problem of different orders (Comparable vs byte-lexicographical) and I feel that needs more thought. So currently I am leaning towards 👎 for the suggested approach.

@Myracle
Copy link
Contributor Author

Myracle commented Dec 25, 2018

Thank you @StefanRRichter for your reply. The items you mentioned are all right and I learn a lot from them. But consider the great performance after my optimization, I think that this will help more persons. My code is already used in our company's online platform and it is stable for a long time. We just use IntervalJoin to deal our data.

Following your suggestions and for code-simple, I think it's better to add filter(lowerBound, upperBound) rather than entries(lowerBound, upperBound) in MapState in semantics, although their function is the same. Also, we add isIntervalJoinSeekOptimization in config to open the optimization. This optimization is not open to users.

Currently, only rocksDB's filter is supported. Because rocksDB's storage is different than others and only rocksDB supports large state . Also, rocksDB supports lowerBound and upperBound interfaces for users to optimize seek. Consider the specific implement for rocksDB, we only open the optimization by config parameter.

For comparable, we will give a note in the comment to warn developers of key-types. Anyone who wants to use this function must guarantee the comparison for the key. As for my case in the intervalJoinOperator, the key is timestamp and it is comparable in byte-lexicographical.

For upperBound, rocksDB does't support setIterateUpperBound in ReadOptions until the rocksdbjni version 5.9.2. Flink rocksdbjni's version is 5.7.5.

Above is my thought. The code is modified too. Thank you.

@Myracle Myracle force-pushed the flink-11050 branch 2 times, most recently from 1cd7460 to d966332 Compare December 26, 2018 03:41
@StefanRRichter
Copy link
Contributor

StefanRRichter commented Jan 8, 2019

@Myracle I don't doubt about the usefulness or stability of the approach, the concern is that it does not generalize well beyond your use-case of integer values. Even for integer values, the semantics for upper and lower would fail if you consider negative integers, where the byte sequence for negative values is lexicographically after those of positive values.
You could say, that we can just have a check that minByteString <= maxByteString, but even there in a general (non-integer) case you could miss elements that you expect to see in between min and max by Comparable from the result of the method if it is not aligned with the serialized byte sequence.
I am not in favor of having some new method that looks like a general API enhancement, but then only works for a subset of cases and can fail sementically (not even always with an error) for other cases.
What I would suggest instead is thinking about a more lower level api change that generalizes well and can be used as building block to support this use case on top of it.

Please also note that we could not yet update the RocksDB version, because they had a performance regression in the merge operator, but this should be fixed soon.

@opnarius
Copy link

Hello, any more thoughts on implementing ordering and filtering of MapState? This would really boost the inner join performance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants