feat(flink): Off-heap lookup join cache backed by RocksDB#18231
Open
wombatu-kun wants to merge 1 commit intoapache:masterfrom
Open
feat(flink): Off-heap lookup join cache backed by RocksDB#18231wombatu-kun wants to merge 1 commit intoapache:masterfrom
wombatu-kun wants to merge 1 commit intoapache:masterfrom
Conversation
e11c3e2 to
5df9db0
Compare
Collaborator
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #18231 +/- ##
============================================
- Coverage 57.28% 57.27% -0.02%
+ Complexity 18532 18530 -2
============================================
Files 1944 1944
Lines 106126 106126
Branches 13118 13118
============================================
- Hits 60793 60781 -12
- Misses 39610 39620 +10
- Partials 5723 5725 +2
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Describe the issue this Pull Request addresses
When using Hudi as a lookup join dimension table in Flink, the entire table is loaded into the cache on each reload. If dimension table is large - we got OOM.
Summary and Changelog
Introduced
LookupCache— a minimal interface (addRow, getRows, clear, close) that abstracts the cache backend from the lookup function.HeapLookupCache— wraps the original HashMap behavior; default, backward-compatible.RocksDBLookupCache— off-heap implementation.Rows are written to RocksDB one-by-one during reload (no intermediate HashMap) so heap never spikes to O(n)
Each row gets its own RocksDB entry with a compound key _, enabling efficient prefix-scan lookup
Flink's TypeSerializer handles binary serialization; a PassThroughSerializer stores the raw bytes without extra Java-serialization overhead
clear() tears down and re-creates the RocksDBDAO (cleaning up the old temp directory automatically)
Added two new config options to
FlinkOptions:lookup.join.cache.type=heap(default) |rocksdblookup.join.rocksdb.path= local directory path, default${java.io.tmpdir}/hudi-lookup-rocksdbHoodieLookupFunctionrefactored to useLookupCacheinterface.Also fixes a pre-existing bug where currentCommit was never set after a successful load, causing redundant full reloads on every TTL expiry
Impact
Solved OutOfMemoryError on lookup joins with large Hudi tables.
Risk Level
none
Documentation Update
Need to add two new config options to documentation:
lookup.join.cache.typeandlookup.join.rocksdb.path.Contributor's checklist