Skip to content

Flink 2.0: Replace Caffeine maxSize cache with LRUCache #13382

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

aiborodin
Copy link
Contributor

We recently discovered that LRUCache, based on LinkedHashMap, has a throughput almost two times as high as the Caffeine Cache with the maximum size configured. Please, see JMH benchmark results here.

Let's use LRUCache in TableMetadataCache to improve the cache performance of the DynamicIcebergSink.

@github-actions github-actions bot added the flink label Jun 25, 2025
@manuzhang manuzhang changed the title Use LRUCache for Iceberg table metadata Flink 2.0: Use LRUCache for Iceberg table metadata Jun 25, 2025
@aiborodin aiborodin force-pushed the use-lru-cache-for-table-metadata branch from 715be14 to 0689560 Compare June 25, 2025 09:54
@ben-manes
Copy link

Is TableMetadataCache single threaded like your benchmark? If so then this is what LinkedHashMap is optimal for since the LRU updates are simple pointer swaps. If not, then you will have to synchronize access because it is not thread-safe, where concurrent usage will cause corruption and instability. This is required for every read (per the javadoc) since every access mutates the LRU order. You also have to be thoughtful about the cache hit rate because a cache miss will be far more expensive than the in-memory operations, e.g. it will have to perform I/O. If the workload is recency-biased then LRU is perfect, but if it has frequency or scans then it can be quite poor.

Caffeine is a multi-threaded cache with an adaptive eviction policy that maximizes the hit rates based on the observed workload. This does incur additional overhead but can greatly improve the overall system performance.

I adjusted Caffeine's benchmark to run as a single threaded and with 16 threads on my 14-core M3 MAX laptop using OpenJDK 24. This uses a Zipfian distribution to simulate hot/cold items with a 100% hit rate. This was not a clean system as I was on a conference call while writing code, which only hurt Caffeine since it will utilize all of the cores. In a cloud environment you will likely observe worse throughput due to virtualization, numa effects, noisy neighbors, older hardware, etc. In general you want to drive application performance decision by profiling to resolve hotspots, as small optimizations can backfire when your benchmark does not fit your real workload.

Screenshot 2025-06-25 at 8 07 06 PM

@aiborodin
Copy link
Contributor Author

Thank you for a detailed reply.

TableMetadataCache is single-threaded, so it's safe to use LRUCache, based on LinkedHashMap.

It was unclear from the caffeine project description that the cache is specifically optimised for high concurrency. I also didn't find any single-threaded benchmarks online, so I wrote my own here which gave these results:

Benchmark                                     Mode  Cnt     Score     Error  Units
CacheBenchmark.testCaffeineCacheMaxSize_Get  thrpt    5   929.031 ±  84.510  ops/s
CacheBenchmark.testCaffeineCacheMaxSize_Put  thrpt    5   548.677 ±  12.191  ops/s
CacheBenchmark.testLRUCache_Get              thrpt    5  1657.313 ±  71.981  ops/s
CacheBenchmark.testLRUCache_Put              thrpt    5  1206.151 ± 112.609  ops/s

I guess it makes sense that caffeine performs worse in a single-threaded scenario due to thread synchronisation. It might be worth clarifying this on the project page so it's more visible for users.

@ben-manes
Copy link

Yep, yours are reasonable but you don’t need to have the loop and can let jmh handle it for better clarity of the results. Those are 100k calls per unit.

It’s not that much worse as 44M vs 76M reads/s is far faster than typically needed. Usually those who care need primitive collections and are very specialized. The much better hit rate than LRU more than compensates because milliseconds for an extra miss outweighs saving a few nanoseconds per hit. LHM as an LRU is really great, but LRU isn’t as good as people assume.

@pvary
Copy link
Contributor

pvary commented Jun 26, 2025

Thanks for the PR @aiborodin, and @ben-manes for the nice detailed test and explanation. We are debating sharing the TableCache on JVM level. Your highlights about the concurrent cache access will be a very useful data point in that discussion.

Currently the cache access is single threaded, so we can get away with the suggested LHM solution.

@aiborodin aiborodin force-pushed the use-lru-cache-for-table-metadata branch from 0689560 to d430d54 Compare June 27, 2025 07:22
@aiborodin
Copy link
Contributor Author

Thanks for your comment @pvary. I rebased this change on top of the merged #13340, so this should be ready for review.

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a clever find! As Peter said, we want to eventually share this cache across all components of DynamicSink. We might want to re-evaluate then, but this is good for now.

What about the other instances of Caffeine in the Flink module? E.g. TableSerializerCache, HashKeyGenerator, DynamicWriteResultAggregator, DynamicWriter (they all use Caffeine).

@pvary
Copy link
Contributor

pvary commented Jun 27, 2025

@aiborodin: Do you want to check the other instances of the cache mentioned by @mxm in this PR, or you want to do them in another PR?

We recently discovered that LRUCache, based on LinkedHashMap, performs
almost twice as fast as the Caffeine max size cache. Let's replace the
caffeine cache to optimise the performance.
@aiborodin aiborodin force-pushed the use-lru-cache-for-table-metadata branch from d430d54 to c7afaa2 Compare June 30, 2025 06:35
@aiborodin aiborodin changed the title Flink 2.0: Use LRUCache for Iceberg table metadata Flink 2.0: Replace Caffeine maxSize cache with LRUCache Jun 30, 2025
@aiborodin
Copy link
Contributor Author

aiborodin commented Jun 30, 2025

I replaced all Caffeine maximumSize caches with LRUCache in this PR. The only one left is DynamicWriteResultAggregator, which uses expireAfterWrite semantics:

this.specs =
    Caffeine.newBuilder().expireAfterWrite(CACHE_EXPIRATION_DURATION).softValues().build();
this.outputFileFactories =
    Caffeine.newBuilder().expireAfterWrite(CACHE_EXPIRATION_DURATION).softValues().build();

Should we replace that with LRUCache as well?

@mxm
Copy link
Contributor

mxm commented Jun 30, 2025

We can probably replace that one as well. There is nothing inherently different about that cache.

@@ -90,7 +90,6 @@ void testCachingDisabled() {
TableMetadataCache cache = new TableMetadataCache(catalog, 0, Long.MAX_VALUE, 10);

// Cleanup routine doesn't run after every write
cache.getInternalCache().cleanUp();
assertThat(cache.getInternalCache().estimatedSize()).isEqualTo(0);
assertThat(cache.getInternalCache().size()).isEqualTo(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertThat(cache.getInternalCache()).isEmpy()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use this everwhere.
This is a bit more descriptive

// Manually clean up because the cleanup is not always triggered
keySelectorCache.cleanUp();
assertThat(keySelectorCache.estimatedSize()).isEqualTo(1);
assertThat(keySelectorCache.size()).isEqualTo(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertThat(keySelectorCache).hasSize(1)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use this everwhere.
This is a bit more descriptive

assertThat(cacheItem).isNotNull();

tableUpdater.update(tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned());
assertThat(cache.getInternalCache().getIfPresent(tableIdentifier)).isEqualTo(cacheItem);
assertThat(cache.getInternalCache().get(tableIdentifier)).isEqualTo(cacheItem);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do a contains?

@@ -153,7 +153,6 @@ void testLastResultInvalidation() {
.isEqualTo(CompareSchemasVisitor.Result.SAME);

// Last result cache should be cleared
assertThat(cache.getInternalCache().getIfPresent(tableIdentifier).inputSchemas().get(SCHEMA2))
.isNull();
assertThat(cache.getInternalCache().get(tableIdentifier).inputSchemas().get(SCHEMA2)).isNull();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do an doesNotContain?

@pvary
Copy link
Contributor

pvary commented Jun 30, 2025

I replaced all Caffeine maximumSize caches with LRUCache in this PR. The only one left is DynamicWriteResultAggregator, which uses expireAfterWrite semantics:

this.specs =
    Caffeine.newBuilder().expireAfterWrite(CACHE_EXPIRATION_DURATION).softValues().build();
this.outputFileFactories =
    Caffeine.newBuilder().expireAfterWrite(CACHE_EXPIRATION_DURATION).softValues().build();

Should we replace that with LRUCache as well?

Before we move forward, let's take a step back and consider the size of the cache, and the cost of a cache miss. How many items do we need in the cache for the optimal operation:

  1. Is this something which is static and we can configure?
  2. Is this something which should be configured by the user and relatively stable for a single job?
  3. Is this something which is dependent on the workload, and increases as the number of tables accessed by the job increases?

This one looks like number 3 to me. Also the cache access is only several times during a checkpoint, so the performance is less important. So for me this seems like better handled by caffeine, but I could be convinced.

Also please revisit the pervious decisions on the cache sizes, and reconsider if appropriate.

Thanks,
Peter

@mxm
Copy link
Contributor

mxm commented Jun 30, 2025

You are right Peter, the cache size should probably be proportional to the "active" tables, but using a time-based eviction policy, we have seen worse performance. That was the reason we switched to max size / LRU. So there should probably be an LRU eviction policy not across ALL tables, but per-table. We could extend LRUCache to use a LinkedList per table. We probably have to then compare it again to something like Caffeine to decide if the custom cache implementation is still worth it.

@pvary
Copy link
Contributor

pvary commented Jun 30, 2025

using a time-based eviction policy, we have seen worse performance

IIUC, we access this cache only a few times every checkpoint. Sum of (table x parallelism for the table). Not very few, but not like for every record. Probably doesn't worth the extra complexity.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants