Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15084: Remove lock contention from RemoteIndexCache #13850

Merged
merged 12 commits into from Jun 21, 2023

Conversation

divijvaidya
Copy link
Contributor

@divijvaidya divijvaidya commented Jun 13, 2023

Problem

RemoteIndexCache cache is accessed from multiple threads concurrently in the fetch from consumer code path [1].

Currently, the RemoteIndexCache uses LinkedHashMap as the cache implementation internally. Since LinkedHashMap is not a thread safe data structure, we use coarse grained lock on the entire map/cache when writing to the cache.

This means that if a thread if fetching information from a particular segment from RemoteStorageManager, other threads who are trying to access a different segment from the cache will also wait for the former thread to complete. This is due to the usage of global lock in the cache.

This lock contentions leads to decrease in throughput for fetch from consumer for cases where RSM network call may take more time.

Solution

We need a data structure for the cache which satisfies the following requirements:

  1. Multiple threads should be able to read concurrently.
  2. Fetch for missing keys should not block read for available keys.
  3. Only one thread should fetch for a specific key.
  4. Should support LRU policy.

In Java, all non concurrent data structures (such as LinkedHashMap) violate condition 2. We can potentially use Concurrent data structures such as ConcurrentHashMap but we will have to implement the LRU eviction ourselves on top of this. OR we can implement a LRU cache from scratch ourselves which satisfy the above constraints.

Alternatively, (approach taken in this PR), we can use Caffeine cache which satisfies all the requirements mentioned above. Caffeine performs better than Google Guava cache [2] and is used by major open source projects such as Cassandra, HBase etc. Hence, it is safe to consider this a stable dependency.

Changes

  • This PR uses Caffeine as the underlying cache for RemoteIndexCache.
  • Old File API has been replaces with Files API introduced since JDK 7.

Testing

  • A test has been added which verifies requirement 2 above. The test fails prior to the change and is successful after it.
  • New tests have been added to improve overall test coverage.

[1]

Optional<TransactionIndex> txnIndexOpt = nextSegmentMetadataOpt.map(metadata -> indexCache.getIndexEntry(metadata).txnIndex());

[2] https://github.com/ben-manes/caffeine/wiki/Benchmarks

@divijvaidya
Copy link
Contributor Author

@satishd @showuon please take a look at this when you get a chance. We have identified this as a major problem while running Tiered Storage in production.

@showuon
Copy link
Contributor

showuon commented Jun 15, 2023

I'll check it this week.

@divijvaidya divijvaidya added the tiered-storage Pull requests associated with KIP-405 (Tiered Storage) label Jun 15, 2023
@satishd
Copy link
Member

satishd commented Jun 15, 2023

I will review this PR in a couple of days.

Copy link
Member

@satishd satishd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @divijvaidya for the PR.

In our internal production environments, we ran around remote read requests for 1000+ partitions with bytes out ~500+ MBps per broker and did not see much latencies. It also depends on the cache size and how often the entries are missing the cache. But we observed an impact when there is a degradation in our remote storage(HDFS) clusters. We resolved it in a similar fashion as proposed in this PR by removing the global lock. Remote storages like Cloud object stores may cause longer latencies and encounter this issue.

One way to address is introducing Caffeine's concurrent LFU cache as done in this PR. But it adds a new dependency to the project.

Another way is to change the current LinkedHashMap to a synchronized map and use a slot based lock instead of a global lock. This workaround is less performant than Caffiene but it does not introduce a new dependency.

+1 on using Caffeine as it has a well optimized and reliable concurrent LFU cache. It is used in several other projects built for large scale like Druid, Cassandra, Solr etc. But we need to make sure that it does not bring a lot of other dependencies.

It seems caffeine brings a couple of dependencies like com.google.errorprone:error_prone_annotations and org.checkerframework:checker-qual.
@divijvaidya What dependencies does it bring?

@ben-manes
Copy link

fyi,

Caffeine only depends on annotations for static analysis. This allows users to catch mistakes easier, e.g. if using a null checker.

Traditionally these could be marked as optional dependencies. Unfortunately bugs in scalac, javac, Java modules, etc could cause runtime failures if annotations are not found on the classpath. While that is a spec violation, it’s less risky to include at compile scope and ask users who are adverse to exclude transitives in their build system.

@satishd
Copy link
Member

satishd commented Jun 16, 2023

Thanks for the clarification @ben-manes.

@divijvaidya
Copy link
Contributor Author

Thank you for looking into this @satishd

But we observed an impact when there is a degradation in our remote storage(HDFS) clusters.

Yes, you are right, the impact of this lock contention is felt in cases when fetching from RSM is slow. We observed something similar.

Another way is to change the current LinkedHashMap to a synchronized map and use a slot based lock instead of a global lock. This workaround is less performant than Caffiene but it does not introduce a new dependency.

I understand what you are saying. I did consider that approach. So that we are on the same page, the approach looks like:

// Thread safe data structure that holds the locks per entry
// Keys from this data structure are removed when a 
val entryLockMap = ConcurrentHashMap[Entry, Lock]()

// Internal LRU cache based on LinkedHashMap. LinkedHashMap is made thread safe by synchronising all its operations by using SynchronizedCache (from org.apache.kafka.common.cache)
val internalCache = SynchronizedCache(new LinkedHashMap())

getEntry(key)) {
    // return key is available in cache
    if (internalCache.contains(key)) {
        return internalCache.get(key)
    }    
    
    // we have to update the entry in the cache. we will acquire a lock on entry, fetch it and then update in cache.
    createEntryLock in entryLockMap if not present
    
    // acquire exclusive lock on entry
    entryLockMap.get(key).lock()
    
    try {
        // after acquiring lock, check again for presence of key in cache, another thread may have updated it while 
        // this thread was waiting for lock
        if (internalCache.contains(key)) {
            return internalCache.get(key)
        } 
        val entry = fetchFromRSM(key)
        // no need to acquire a lock in internal cache since get/put methods on this cache are synchronized
        internalCache.put(entry)
    } finally {
        release entry lock
    }
}

The advantages of this approach are:

  1. No external dependency
  2. Same pattern of cache used in other places in the Kafka code (see org.apache.kafka.common.cache.SynchronizedCache). But that code was written in 2015 when perhaps better alternatives didn't exist at that time.

The disadvantages of this approach are:

  1. Cache metrics will have to be completed manually
  2. Cache eviction is done synchronously and hence, it holds the global cache lock.
  3. In the absence of a lock that ensures fairness, when we synchronise the global cache, a fetch thread may get starved leading to high tail latencies. Such a scenario will occur when we have a high get() operation load and other threads for get() are getting prioritized by the lock ahead of the put() thread. That is why I am not a big fan of using data structures with coarse grained global locks.

Caffeine on the other hand doesn't have any of the disadvantage mentioned above, is used in high throughput low latency systems like Cassandra and has a constant release cadence with great support. Hence, I believe that using Caffeine is the right choice here. (I know that you already agreed to this but adding more context here for other readers of the PR)

@divijvaidya divijvaidya requested a review from satishd June 16, 2023 09:10
Copy link
Contributor

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks for the improvement! Left some minor comments.

@@ -117,51 +128,60 @@ class RemoteIndexCacheTest {

@Test
def testCacheEntryExpiry(): Unit = {
val cache = new RemoteIndexCache(maxSize = 2, rsm, logDir = logDir.toString)
// close existing cache created in test setup before creating a new one
Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't do this at the beginning of the test, will it fail the tests? I thought we've already done it at the AfterEach

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it wouldn't fail the test but it will lead to memory & thread leaks. This is because in some tests, we are forcefully assigning a new value to member variable cache in the test. Hence, we end up with two instances of cache, one which was initialized in @BeforeEach and another which the test created by itself. If we don't explicitly close the cache which is initialized at the @BeforeEach, it will keep on consuming heap and have stray threads which won't be released until process is closed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, got it. Make sense.

@satishd
Copy link
Member

satishd commented Jun 19, 2023

I understand what you are saying. I did consider that approach. So that we are on the same page, the approach looks like:
...
Caffeine on the other hand doesn't have any of the disadvantage mentioned above, is used in high throughput low latency systems like Cassandra and has a constant release cadence with great support. Hence, I believe that using Caffeine is the right choice here. (I know that you already agreed to this but adding more context here for other readers of the PR)

Right, we are on the same page. I talked about the other possible approach without adding a dependency in the same comment mentioning I am not in favour of that because of its cons including less performant than caffeine.

Copy link
Contributor

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks for the improvement!

@divijvaidya
Copy link
Contributor Author

Rebasing from trunk to fix flaky CI test failures.

Copy link
Member

@satishd satishd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @divijvaidya for addressing the review comments, LGTM.

@divijvaidya
Copy link
Contributor Author

divijvaidya commented Jun 20, 2023

Thank you @showuon @satishd for your review so far. I have added few more commits to de-flake some tests, improved thread safety for Entry, added new unit tests and added log statements that help in debugging. The flakiness was caused due to how the tests were written earlier (overriding class members with spied variables).

Requesting one last review cycle from you folks!

@showuon
Copy link
Contributor

showuon commented Jun 21, 2023

@divijvaidya , sorry, which commits should I check?

@divijvaidya
Copy link
Contributor Author

@divijvaidya , sorry, which commits should I check?

I merged together new changes in one commit. It's this one: 8513b5f

Comment on lines +270 to +272
inReadLock(lock) {
val cacheKey = remoteLogSegmentMetadata.remoteLogSegmentId().id()
internalCache.get(cacheKey, (uuid: Uuid) => {
Copy link
Contributor

@showuon showuon Jun 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use writeLock here since the get means

If the specified key is not already associated with a value, attempts to compute its value using the given mapping function and enters it into this cache unless {@code null}. The entire method invocation is performed atomically, so the function is applied at most once per key. Some attempted update operations on this cache by other threads may be blocked while the computation is in progress, so the computation should be short and simple, and must not attempt to update any other mappings of this cache.

So it's actually updating value
https://github.com/ben-manes/caffeine/blob/master/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Cache.java#L60-L65

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will update the cache with a new entry. But since the internalCache is thread safe, we don't need to prevent any other thread from read/writing to another entry in the cache, hence, we don't need to acquire a write lock over the entire RemoteIndexCache.

Does this answer your question?

Copy link
Contributor

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@divijvaidya
Copy link
Contributor Author

Unrelated test failures:

[Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testSyncTopicConfigs()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13850/16/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationSSLTest/Build___JDK_17_and_Scala_2_13___testSyncTopicConfigs__/)
[Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13850/16/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_17_and_Scala_2_13___testBalancePartitionLeaders__/)
[Build / JDK 17 and Scala 2.13 / org.apache.kafka.streams.integration.LagFetchIntegrationTest.shouldFetchLagsDuringRestoration()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13850/16/testReport/junit/org.apache.kafka.streams.integration/LagFetchIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldFetchLagsDuringRestoration__/)
[Build / JDK 17 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13850/16/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_17_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated__/)
[Build / JDK 11 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testNewAndChangedTopicsInDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13850/16/testReport/junit/kafka.zk/ZkMigrationIntegrationTest/Build___JDK_11_and_Scala_2_13____1__Type_ZK__Name_testNewAndChangedTopicsInDualWrite__MetadataVersion_3_4_IV0__Security_PLAINTEXT/)
[Build / JDK 11 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13850/16/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_11_and_Scala_2_13___testBalancePartitionLeaders__/)
[Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testSyncTopicConfigs()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13850/16/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest/Build___JDK_8_and_Scala_2_12___testSyncTopicConfigs__/)
[Build / JDK 8 and Scala 2.12 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testNewAndChangedTopicsInDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13850/16/testReport/junit/kafka.zk/ZkMigrationIntegrationTest/Build___JDK_8_and_Scala_2_12____1__Type_ZK__Name_testNewAndChangedTopicsInDualWrite__MetadataVersion_3_4_IV0__Security_PLAINTEXT/)
[Build / JDK 8 and Scala 2.12 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testNewAndChangedTopicsInDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13850/16/testReport/junit/kafka.zk/ZkMigrationIntegrationTest/Build___JDK_8_and_Scala_2_12____1__Type_ZK__Name_testNewAndChangedTopicsInDualWrite__MetadataVersion_3_4_IV0__Security_PLAINTEXT_2/)
[Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13850/16/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_8_and_Scala_2_12___testBalancePartitionLeaders__/)

@divijvaidya divijvaidya merged commit 88e784f into apache:trunk Jun 21, 2023
1 check failed
@divijvaidya divijvaidya deleted the indexcache-lock branch August 15, 2023 14:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core tiered-storage Pull requests associated with KIP-405 (Tiered Storage)
Projects
None yet
4 participants