[FLINK-9491] Implement timer data structure based on RocksDB #6227
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
This PR is another step towards integrating the timer state with the keyed state backends.
First, the PR generalizes the data structure
InternalTimerHeaptoInternalPriorityQueueso that the functionality of a heap-set-organized state is decoupled from storing timers. The main reason for this is that state/backend related code lives in flink-runtime and timers are a concept from flink-streaming.Second, the PR also introduced an implementation of
InternalPriorityQueuewith set semantics (i.e. the data structure we require to manage timers) that is based on RocksDB. State in RocksDB is always partitioned into key-groups, so the general idea is to organize the implementation as a heap-of-heaps, where each sub-heap represents elements from exactly one key-group, that merges by priority over the key-group boundaries. The implementation reuses the in-memory implementation ofInternalPriorityQueue(without set-properties) as the super-heap that holds the sub-heaps. Further more each sub-heap is an instance ofCachingInternalPriorityQueueSet, consisting of a "fast", "small" cache (OrderedSetCache) and a "slow", "unbounded" store (OrderedSetStore), currently applying simple write-through synchronization between cache and store. In the current implementation, the cache is based on a an AVL-Tree and restricted in capacity. The store is backed by a RocksDB column family. We utilize caching to reduced read-accesses to RocksDB.Please note that the RocksDB implementation is currently not yet integrated with the timer service or the backend. This will happen in the next steps.
Brief change log
InternalTimerHeapto decouple it from timers, moved the data structures from flink-streaming to flink-runtime (->InternalPriorityQueue).HeapPriorityQueue) and a heap extended with set-semantics (HeapPriorityQueueSet).InternalPriorityQueuewith set-semantics. Starting point isKeyGroupPartitionedPriorityQueue. This class uses aHeapPriorityQueueofCachingInternalPriorityQueueSetelements that each contains elements for exactly one key-group (heap-of-heaps). For RocksDB, we configure eachCachingInternalPriorityQueueSetto use aTreeOrderedSetCacheand aRocksDBOrderedStore.Verifying this change
I added dedicated tests for all data structures.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (no)Documentation