New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-10500: Thread Cache Resizes #9572
Conversation
@cadonna Part 2 |
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the PR, @wcarlson5 !
Here my feedback.
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few nits. Overall LGTM.
} | ||
final long cacheSizePerThread = totalCacheSize / (numStreamThreads + (hasGlobalTopology ? 1 : 0)); | ||
totalCacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG); | ||
final long cacheSizePerThread = totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why move off using hasGlobalTopology
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was in a separate method without access to hasGlobalTopology
. I supposes if it stays we can move it back
@@ -806,6 +803,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, | |||
rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); | |||
} | |||
|
|||
private void resizeThreadCache(final int numStreamThreads) { | |||
final long cacheSizePreThread = totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this duplicates L733
. Might be good to extract into a small helper method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did have it in a separate method but helper but when removing the totalCacheSize < 0
check @cadonna thought it would be more readable inline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why the totalCacheSize
check is relevant for avoiding code duplication?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it was about readability. I might be misremembering though, as it was a conversation we had last week
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this line is duplicated, it should go in a method. When I proposed to move it inline, I was apparently not aware that the same line was used somewhere else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to a new method. Glad we got that cleared up. LGTM?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM?
If this is a question, should it be LGTY? 😂
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
Outdated
Show resolved
Hide resolved
final CircularIterator<NamedCache> circularIterator = new CircularIterator<>(caches.values()); | ||
while (sizeBytes() > maxCacheSizeBytes) { | ||
if (!circularIterator.hasNext()) { | ||
log.error("Unable to remove any more entries as all caches are empty"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this ever happen? If we the max cache size is smaller than a single entry, would we not evict the entry and the used cache size would always shrink to zero?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we add a check to make sure the number of threads is positive then probably not. Ill add that check then remove this one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. -- I guess the miss-leading fact was, that this check was done inside the while-loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, in retrospect it was not very clear. Hopefully its better this way now
} | ||
|
||
private void resizeThreadCache(final int numStreamThreads) { | ||
if (numStreamThreads < 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can it be smaller than 0
? Should the test be <= 0
or < 1
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be zero if you have a global thread, but since this is internal the check might not be entirely necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it can be zero, but the check says < 0
, so it would always evaluate to false?
And if we have zero threads, we should not resize the cache as we might end up in an infinite loop? But we would only call this method if we "shrink", ie, if the thread count grows, but it can never grow from negative to zero, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a good point. Maybe what we need to do it put a minimum size of cache to limit how many stream threads an instance can have?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, getCacheSizePerThread
would eventually return zero (with growing number of threads), what means that every put() into the cache would result in an immediate eviction. So I don't think we need to do anything for this corner case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is a good point
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
Outdated
Show resolved
Hide resolved
Thanks for the PR @wcarlson5. Merged to |
The thread cache can now be resized. This will go towards being able to scale the number of threads
Committer Checklist (excluded from commit message)