-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Flink 2.0: Replace Caffeine maxSize cache with LRUCache #13382
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
715be14
to
0689560
Compare
Is Caffeine is a multi-threaded cache with an adaptive eviction policy that maximizes the hit rates based on the observed workload. This does incur additional overhead but can greatly improve the overall system performance. I adjusted Caffeine's benchmark to run as a single threaded and with 16 threads on my 14-core M3 MAX laptop using OpenJDK 24. This uses a Zipfian distribution to simulate hot/cold items with a 100% hit rate. This was not a clean system as I was on a conference call while writing code, which only hurt Caffeine since it will utilize all of the cores. In a cloud environment you will likely observe worse throughput due to virtualization, numa effects, noisy neighbors, older hardware, etc. In general you want to drive application performance decision by profiling to resolve hotspots, as small optimizations can backfire when your benchmark does not fit your real workload. ![]() |
Thank you for a detailed reply.
It was unclear from the caffeine project description that the cache is specifically optimised for high concurrency. I also didn't find any single-threaded benchmarks online, so I wrote my own here which gave these results:
I guess it makes sense that caffeine performs worse in a single-threaded scenario due to thread synchronisation. It might be worth clarifying this on the project page so it's more visible for users. |
Yep, yours are reasonable but you don’t need to have the loop and can let jmh handle it for better clarity of the results. Those are 100k calls per unit. It’s not that much worse as 44M vs 76M reads/s is far faster than typically needed. Usually those who care need primitive collections and are very specialized. The much better hit rate than LRU more than compensates because milliseconds for an extra miss outweighs saving a few nanoseconds per hit. LHM as an LRU is really great, but LRU isn’t as good as people assume. |
Thanks for the PR @aiborodin, and @ben-manes for the nice detailed test and explanation. We are debating sharing the TableCache on JVM level. Your highlights about the concurrent cache access will be a very useful data point in that discussion. Currently the cache access is single threaded, so we can get away with the suggested LHM solution. |
0689560
to
d430d54
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a clever find! As Peter said, we want to eventually share this cache across all components of DynamicSink. We might want to re-evaluate then, but this is good for now.
What about the other instances of Caffeine
in the Flink module? E.g. TableSerializerCache
, HashKeyGenerator
, DynamicWriteResultAggregator
, DynamicWriter
(they all use Caffeine).
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
Outdated
Show resolved
Hide resolved
@aiborodin: Do you want to check the other instances of the cache mentioned by @mxm in this PR, or you want to do them in another PR? |
We recently discovered that LRUCache, based on LinkedHashMap, performs almost twice as fast as the Caffeine max size cache. Let's replace the caffeine cache to optimise the performance.
d430d54
to
c7afaa2
Compare
I replaced all Caffeine this.specs =
Caffeine.newBuilder().expireAfterWrite(CACHE_EXPIRATION_DURATION).softValues().build();
this.outputFileFactories =
Caffeine.newBuilder().expireAfterWrite(CACHE_EXPIRATION_DURATION).softValues().build(); Should we replace that with |
We can probably replace that one as well. There is nothing inherently different about that cache. |
@@ -90,7 +90,6 @@ void testCachingDisabled() { | |||
TableMetadataCache cache = new TableMetadataCache(catalog, 0, Long.MAX_VALUE, 10); | |||
|
|||
// Cleanup routine doesn't run after every write | |||
cache.getInternalCache().cleanUp(); | |||
assertThat(cache.getInternalCache().estimatedSize()).isEqualTo(0); | |||
assertThat(cache.getInternalCache().size()).isEqualTo(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertThat(cache.getInternalCache()).isEmpy()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use this everwhere.
This is a bit more descriptive
// Manually clean up because the cleanup is not always triggered | ||
keySelectorCache.cleanUp(); | ||
assertThat(keySelectorCache.estimatedSize()).isEqualTo(1); | ||
assertThat(keySelectorCache.size()).isEqualTo(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertThat(keySelectorCache).hasSize(1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use this everwhere.
This is a bit more descriptive
assertThat(cacheItem).isNotNull(); | ||
|
||
tableUpdater.update(tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned()); | ||
assertThat(cache.getInternalCache().getIfPresent(tableIdentifier)).isEqualTo(cacheItem); | ||
assertThat(cache.getInternalCache().get(tableIdentifier)).isEqualTo(cacheItem); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do a contains
?
@@ -153,7 +153,6 @@ void testLastResultInvalidation() { | |||
.isEqualTo(CompareSchemasVisitor.Result.SAME); | |||
|
|||
// Last result cache should be cleared | |||
assertThat(cache.getInternalCache().getIfPresent(tableIdentifier).inputSchemas().get(SCHEMA2)) | |||
.isNull(); | |||
assertThat(cache.getInternalCache().get(tableIdentifier).inputSchemas().get(SCHEMA2)).isNull(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do an doesNotContain
?
Before we move forward, let's take a step back and consider the size of the cache, and the cost of a cache miss. How many items do we need in the cache for the optimal operation:
This one looks like number 3 to me. Also the cache access is only several times during a checkpoint, so the performance is less important. So for me this seems like better handled by caffeine, but I could be convinced. Also please revisit the pervious decisions on the cache sizes, and reconsider if appropriate. Thanks, |
You are right Peter, the cache size should probably be proportional to the "active" tables, but using a time-based eviction policy, we have seen worse performance. That was the reason we switched to max size / LRU. So there should probably be an LRU eviction policy not across ALL tables, but per-table. We could extend LRUCache to use a LinkedList per table. We probably have to then compare it again to something like Caffeine to decide if the custom cache implementation is still worth it. |
IIUC, we access this cache only a few times every checkpoint. Sum of (table x parallelism for the table). Not very few, but not like for every record. Probably doesn't worth the extra complexity. |
We recently discovered that LRUCache, based on
LinkedHashMap
, has a throughput almost two times as high as the Caffeine Cache with the maximum size configured. Please, see JMH benchmark results here.Let's use
LRUCache
inTableMetadataCache
to improve the cache performance of theDynamicIcebergSink
.