Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9401: Reduce contention for Fetch requests #15836

Merged
merged 5 commits into from
May 11, 2024

Conversation

gaurav-narula
Copy link
Contributor

KIP-227 introduced in-memory caching of FetchSessions. Brokers with a large number of Fetch requests suffer from contention on trying to acquire a lock on FetchSessionCache.

This change aims to reduce lock contention for FetchSessionCache by sharding the cache into multiple segments, each responsible for an equal range of sessionIds. Assuming Fetch requests have a uniform distribution of sessionIds, the probability of contention on a segment is reduced by a factor of the number of segments.

We ensure backwards compatibility by ensuring total number of cache entries remain the same as configured and sessionIds are randomly allocated.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@gaurav-narula
Copy link
Contributor Author

gaurav-narula commented Apr 30, 2024

The following images show lock profiles collected using async-profiler before and after this change with numCacheShards = 8 profiling time = 30s and demonstrates significant reduction in contention

Before
before

After
after

Edit: Corrected numCacheShards = 8 (was incorrectly mentioned as 64 before)

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaurav-narula thanks for this improvement. two major comments left. PTAL

metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, () => FetchSessionCache.this.totalPartitions)
metricsGroup.removeMetric(FetchSession.INCREMENTAL_FETCH_SESSIONS_EVICTIONS_PER_SEC)
metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, metricTag)
metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => FetchSessionCache.this.size, metricTag)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure whether this is allowed. It seems to break the compatibility of metrics as it adds new tags. It means kafka users who monitoring this metrics need to update the query.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am maybe wrong here but don't change tags of an existing metrics need a KIP!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. The reason I added the labels was because of the removeMetric calls. On further investigation, it seems that metricsGroup.removeMetric isn't really needed and we can have a combined metric for all the shards. The lambda that calculates the value for the gauges needs to synchronize on the cache but it's a small overhead overall as this is only when metrics are gathered.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the metric would be KIP-able. But you have decided you don't need the tag :)

KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
// The FetchSessionCache is divided into config.numIoThreads shards, each responsible
// for sessionIds falling in [Max(1, shardNum * sessionIdRange), (shardNum + 1) * sessionIdRange)
val sessionIdRange = Int.MaxValue / config.numIoThreads
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pardon me. what happens when users update numIoThreads dynamically?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using config.numIoThreads is a heuristic and I reckon it's okay if shard count isn't modified dynamically along with numIoThreads.

In fact, I realised my "After" benchmark was with 8 shards and not 64 as I had a misconfiguration while running it which goes to show even some sharding goes a long way. How about we introduce a constant for now and set it to 8?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the number of shards need not depend on the thread pool size, I agree it may be less confusing to keep config.numIoThreads out of this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • 1 on keeping number of threads out this.

How about we introduce a constant for now and set it to 8?

Out of curiosity are you planning later to make this constant a configuration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity are you planning later to make this constant a configuration?

I think it's unlikely one would need to tweak this often. My experiment ran on a fairly busy cluster with ~50k Fetch rps (kafka.network-RequestMetrics-RequestsPerSec(request=Fetch)) on the broker which is reasonably high.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would tend not to add a configuration for this. The value you're talking about sounds like it's doing the job on a busy workload, and it's small enough that there's negligible benefit of configuring it smaller for a tiny cluster. Having a configuration kind of crystallizes this aspect of the internal design of Kafka, and you might have an even better idea in the future that would make this configuration pointless.

Copy link
Contributor

@OmniaGM OmniaGM left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I left few comments but I echo the same comments @chia7712 mentioned.

// The FetchSessionCache is divided into config.numIoThreads shards, each responsible
// for sessionIds falling in [Max(1, shardNum * sessionIdRange), (shardNum + 1) * sessionIdRange)
val sessionIdRange = Int.MaxValue / config.numIoThreads
val fetchSessionCaches = Range(0, config.numIoThreads)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 to config.numIoThreads is usually more preferred to scala than Range

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And same as @chia7712 comment before config.numIoThreads is listed as broker dynamic config so what would happened when this change.

@@ -430,6 +439,9 @@ class FullFetchContext(private val time: Time,
}
cachedPartitions
}
// We select a shard randomly out of the available options
val shard = ThreadLocalRandom.current().nextInt(caches.size)
val cache = caches.apply(shard);
Copy link
Contributor

@OmniaGM OmniaGM May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need apply, caches(shard) is enough


def getShardedCache(sessionId: Int): FetchSessionCache = {
val shard = sessionId / caches.head.sessionIdRange
caches.apply(shard)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here about apply

// The FetchSessionCache is divided into config.numIoThreads shards, each responsible
// for sessionIds falling in [Max(1, shardNum * sessionIdRange), (shardNum + 1) * sessionIdRange)
val sessionIdRange = Int.MaxValue / config.numIoThreads
val fetchSessionCaches = Range(0, config.numIoThreads)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about Range vs to

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've used until because I needed an exclusive Range

// Given
val time = new MockTime()
val sessionIdRange = Int.MaxValue / 8
val caches = Range(0, 8).map(shardNum => new FetchSessionCache(10, 1000, sessionIdRange, shardNum))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

Copy link
Contributor

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. It's good that you're improving this area.

KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
// The FetchSessionCache is divided into config.numIoThreads shards, each responsible
// for sessionIds falling in [Max(1, shardNum * sessionIdRange), (shardNum + 1) * sessionIdRange)
val sessionIdRange = Int.MaxValue / config.numIoThreads
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would tend not to add a configuration for this. The value you're talking about sounds like it's doing the job on a busy workload, and it's small enough that there's negligible benefit of configuring it smaller for a tiny cluster. Having a configuration kind of crystallizes this aspect of the internal design of Kafka, and you might have an even better idea in the future that would make this configuration pointless.

metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, () => FetchSessionCache.this.totalPartitions)
metricsGroup.removeMetric(FetchSession.INCREMENTAL_FETCH_SESSIONS_EVICTIONS_PER_SEC)
metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, metricTag)
metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => FetchSessionCache.this.size, metricTag)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the metric would be KIP-able. But you have decided you don't need the tag :)

@@ -583,9 +595,13 @@ case class EvictableKey(privileged: Boolean, size: Int, id: Int) extends Compara
*
* @param maxEntries The maximum number of entries that can be in the cache.
* @param evictionMs The minimum time that an entry must be unused in order to be evictable.
*/
* @param sessionIdRange The number of sessionIds each cache shard handles. The range for a given shard is [Math.max(1, shardNum * sessionIdRange), (shardNum + 1) * sessionIdRange).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the [ , ) notation means what you intend for inclusive/exclusive bounds, but the presence of the parentheses makes it a bit hard to read I think. Maybe using >= and < would be clearer.

* @param reqMetadata The request metadata.
* @param fetchData The partition data from the fetch request.
* @param usesTopicIds True if this session should use topic IDs.
* @param isFromFollower True if this fetch request came from a follower.
*/
class FullFetchContext(private val time: Time,
private val cache: FetchSessionCache,
private val caches: Seq[FetchSessionCache],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cacheShards?

*/
* @param sessionIdRange The number of sessionIds each cache shard handles. The range for a given shard is [Math.max(1, shardNum * sessionIdRange), (shardNum + 1) * sessionIdRange).
* @param shardNum Identifier for this shard.
*/
class FetchSessionCache(private val maxEntries: Int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the class name ought to be FetchSessionCache shard too. The cache really is the whole set of shards.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! I've renamed the existing type to FetchSessionCacheShard and FetchSessionCache is now essentially a wrapper around Seq[FetchSessionCacheShard]. This conveys the intention clearly indeed.


def this(time: Time, cache: FetchSessionCache) = this(time, Seq(cache))

def getShardedCache(sessionId: Int): FetchSessionCache = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getCacheShard?

@gaurav-narula gaurav-narula force-pushed the KAFKA-9401 branch 2 times, most recently from 4a78018 to 1396c70 Compare May 3, 2024 20:19
Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaurav-narula nice improvement. a couple of comments left. PTAL

@@ -430,7 +438,10 @@ class FullFetchContext(private val time: Time,
}
cachedPartitions
}
val responseSessionId = cache.maybeCreateSession(time.milliseconds(), isFromFollower,
// We select a shard randomly out of the available options
val shard = ThreadLocalRandom.current().nextInt(cache.size())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems both size and apply are used to randomly pick up a FetchSessionCacheShard. Maybe we can add a method getRandomCacheShard to FetchSessionCache.

class FetchSessionCacheShard(private val maxEntries: Int,
private val evictionMs: Long,
val sessionIdRange: Int = Int.MaxValue,
private val shardNum: Int = 0) extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should FetchSessionCacheShard define the logIdent to helper users distinguish the source of log message? For example, LogClean uses thread id to be the logIdent

this.logIdent = s"Cleaner $id: "

val responseSessionId = cache.maybeCreateSession(time.milliseconds(), isFromFollower,
// We select a shard randomly out of the available options
val shard = ThreadLocalRandom.current().nextInt(cache.size())
val cacheShard = cache(shard);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove ;

val sessionIdRange = Int.MaxValue / NumFetchSessionCacheShards
val fetchSessionCacheShards = (0 until NumFetchSessionCacheShards)
.map(shardNum => new FetchSessionCacheShard(
config.maxIncrementalFetchSessionCacheSlots / NumFetchSessionCacheShards,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior gets changed a bit here. max.incremental.fetch.session.cache.slots is used to control the "total" number of cached session. With this change, however, the session room could be full even though the "total" size does not reach the threshold defined by max.incremental.fetch.session.cache.slots.

This is not big issue, and we can update the docs of max.incremental.fetch.session.cache.slots for that behavior change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great point and it's quite subtle. I reckon this may happen because the cacheShards are picked randomly and it can be avoided by picking shards in round-robin. I'll make this change along with addressing the other comments 👍

Some subtle differences cannot be avoided, particularly around eviction. The KIP considers all existing sessions when considering a session for eviction while this change would consider only existing sessions within a shard for eviction. I'll update the documentation to call out the difference.

@gaurav-narula gaurav-narula force-pushed the KAFKA-9401 branch 2 times, most recently from 2b889d6 to 59f3eaa Compare May 4, 2024 16:34
Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaurav-narula thanks for updated PR. Please take a look at following comments.

@@ -690,6 +702,10 @@ class FetchSessionCache(private val maxEntries: Int,
* 2. B is considered "stale" because it has been inactive for a long time, or
* 3. A contains more partitions than B, and B is not recently created.
*
* Prior to KAFKA-9401, the session cache was not sharded and we looked at all
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This docs is great. Could you please update this also?

val MaxIncrementalFetchSessionCacheSlotsDoc = "The maximum number of incremental fetch sessions that we will maintain."


// Returns the shard in round-robin
def getNextCacheShard: FetchSessionCacheShard = {
val shardNum = (FetchSessionCache.counter.getAndIncrement() % size).toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As int is enough to this case, maybe we can use AtomicInteger?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used AtomicLong to practically rule out an overflow but found Utils.toPositive which is used by RoundRobinPartitioner :) Updated to use an AtomicInteger and also added some test to ensure round-robin allocations.

FetchSessionCache.metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, () => cacheShards.map(_.totalPartitions).sum)

def getCacheShard(sessionId: Int): FetchSessionCacheShard = {
val shard = sessionId / cacheShards.head.sessionIdRange
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It assumes the cacheShards is sorted by the shardNum, right? If so, could you please add comments for it?

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@gaurav-narula
Copy link
Contributor Author

Resolved conflict with trunk

@chia7712
Copy link
Contributor

chia7712 commented May 7, 2024

@gaurav-narula Could you please rebase code to trigger QA again? It seems we have thread leaks in some tests. That is unrelated to this PR as I had observed that before.

KIP-227 introduced in-memory caching of FetchSessions. Brokers with
a large number of Fetch requests suffer from contention on trying
to acquire a lock on FetchSessionCache.

This change aims to reduce lock contention for FetchSessionCache
by sharding the cache into multiple segments, each responsible for
an equal range of sessionIds. Assuming Fetch requests have a uniform
distribution of sessionIds, the probability of contention on a segment
is reduced by a factor of the number of segments.

We ensure backwards compatibility by ensuring total number of cache
entries remain the same as configured and sessionIds are randomly
allocated.
Uses AtomicInteger after ensuring we don't overflow. Updated comments
as suggested in review comments. Added tests for ensuring round-robin
allocation.
@soarez
Copy link
Member

soarez commented May 7, 2024

:core:test timed out only on JDK 8 and Scala 2.12. Restarted the build

@chia7712
Copy link
Contributor

chia7712 commented May 9, 2024

@gaurav-narula Could you take a look at those failed tests? I feel they are unrelated to this PR, so +1

Copy link
Member

@soarez soarez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked all failed tests, and they all pass locally. If @gaurav-narula can also confirm they're unrelated to the changes I can merge this.

@gaurav-narula
Copy link
Contributor Author

Thanks for the review! I'm fairly convinced these failures are unrelated. The report on Github enterprise suggests the failed tests are flakey. Please refer the following links for the flakiness report.

https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon&tests.container=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest&tests.test=testAlterSinkConnectorOffsetsOverriddenConsumerGroupId
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon&tests.container=org.apache.kafka.trogdor.coordinator.CoordinatorTest&tests.test=testTaskRequestWithOldStartMsGetsUpdated()
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon&tests.container=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest&tests.test=testAlterSinkConnectorOffsetsOverriddenConsumerGroupId
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon&tests.container=kafka.api.ConsumerBounceTest&tests.test=testSeekAndCommitWithBrokerFailures()
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon&tests.container=kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest&tests.test=testProduceConsumeViaAssign(String)[1]
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon&tests.container=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest&tests.test=testResetSinkConnectorOffsetsDifferentKafkaClusterTargeted
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon&tests.container=kafka.api.SaslSslConsumerTest&tests.test=testCoordinatorFailover(String, String)[1]
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon&tests.container=kafka.api.SaslSslConsumerTest&tests.test=testCoordinatorFailover(String, String)[2]
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon&tests.container=kafka.api.SaslSslConsumerTest&tests.test=testCoordinatorFailover(String, String)[3]
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon&tests.container=kafka.api.SaslSslConsumerTest&tests.test=testCoordinatorFailover(String, String)[4]
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon&tests.container=kafka.server.DelegationTokenRequestsTest&tests.test=testDelegationTokenRequests(String)[1]
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon&tests.container=org.apache.kafka.tools.GetOffsetShellTest&tests.test=testTopicPartitionsArgWithInternalIncluded()[1]
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon&tests.container=org.apache.kafka.tools.GetOffsetShellTest&tests.test=testTopicPartitionsArg()[1]
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon&tests.container=org.apache.kafka.tools.TopicCommandIntegrationTest&tests.test=testDescribeWithDescribeTopicPartitionsApi(String)[2]
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon&tests.container=org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest&tests.test=testSyncTopicConfigs()
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon&tests.container=kafka.api.PlaintextConsumerAssignorsTest&tests.test=testRemoteAssignorRange(String%2C%20String)%5B1%5D
https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon&tests.container=org.apache.kafka.tools.TopicCommandIntegrationTest&tests.test=testDescribeWithDescribeTopicPartitionsApi(String)[2]

@chia7712 chia7712 merged commit 47841e0 into apache:trunk May 11, 2024
1 check failed
@chia7712
Copy link
Contributor

@gaurav-narula Please file PR for branch 3.7 if you feel this one needs to be backport :)

gaurav-narula added a commit to gaurav-narula/kafka that referenced this pull request May 11, 2024
KIP-227 introduced in-memory caching of FetchSessions. Brokers with a large number of Fetch requests suffer from contention on trying to acquire a lock on FetchSessionCache.

This change aims to reduce lock contention for FetchSessionCache by sharding the cache into multiple segments, each responsible for an equal range of sessionIds. Assuming Fetch requests have a uniform distribution of sessionIds, the probability of contention on a segment is reduced by a factor of the number of segments.

We ensure backwards compatibility by ensuring total number of cache entries remain the same as configured and sessionIds are randomly allocated.

Reviewers: Igor Soarez <soarez@apple.com>, Chia-Ping Tsai <chia7712@gmail.com>
gongxuanzhang pushed a commit to gongxuanzhang/kafka that referenced this pull request Jun 12, 2024
KIP-227 introduced in-memory caching of FetchSessions. Brokers with a large number of Fetch requests suffer from contention on trying to acquire a lock on FetchSessionCache.

This change aims to reduce lock contention for FetchSessionCache by sharding the cache into multiple segments, each responsible for an equal range of sessionIds. Assuming Fetch requests have a uniform distribution of sessionIds, the probability of contention on a segment is reduced by a factor of the number of segments.

We ensure backwards compatibility by ensuring total number of cache entries remain the same as configured and sessionIds are randomly allocated.

Reviewers: Igor Soarez <soarez@apple.com>, Chia-Ping Tsai <chia7712@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants