Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12648: Enforce size limits for each task's cache #11278

Closed
wants to merge 1 commit into from
Closed

KAFKA-12648: Enforce size limits for each task's cache #11278

wants to merge 1 commit into from

Conversation

wcarlson5
Copy link
Contributor

make max buffer cache settable for a name topology

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@wcarlson5
Copy link
Contributor Author

@ableegoldman here is my idea for cache.max.bytes.buffering for named topologies. I will need to make a few changes once #11272 gets in but the main idea is here. Can you give it a look?

cc/ @guozhangwang

Copy link
Contributor

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I was thinking about is how to handle the case in which the user has explicitly set a cache size in the global configs but also overrode it for only a subset of NamedTopologies. Should we just consider this a topology-level config only and throw (or log) a warning if the user sets cache size in global configs when using named topologies? Or should be consider that global size as an overall upper bound still and only set/resize specific NamedCaches when their corresponding topology set a cache size override?

@@ -247,4 +247,6 @@ default boolean commitRequested() {
* @return This returns the time the task started idling. If it is not idling it returns empty.
*/
Optional<Long> timeCurrentIdlingStarted();

long maxBuffer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably specify what kind of buffer in the name (esp. with KIP-770 adding another relevant buffer type)

@@ -43,7 +44,7 @@
// internal stats
private long numPuts = 0;
private long numGets = 0;
private long numEvicts = 0;
private AtomicLong numEvicts = new AtomicLong(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why make this atomic, we're still only ever evicting/accessing this from the actual StreamThread right?

Comment on lines +109 to +113
while (sizeBytes() > maxCacheSizeBytes) {
final NamedCache cache = circularIterator.next();
cache.evict();
numEvicts.incrementAndGet();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we do this same thing in the other #resize for thread count changes, can you factor it out into a helper method? Then I think we can narrow the scope and make only that helper synchronized (should double check that though)

}
}
}
if (caches.values().isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason this checks emptiness of caches.values() instead of caches.keys()?

@@ -502,7 +504,8 @@ public StreamThread(final Time time,
this.assignmentErrorCode = assignmentErrorCode;
this.shutdownErrorHook = shutdownErrorHook;
this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
this.cacheResizer = cacheResizer;
this.threadCache = threadCache;
cacheSizes = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be a concurrent map? Seems to only be accessed by the StreamThread itself

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I was thinking about is how to handle the case in which the user has explicitly set a cache size in the global configs but also overrode it for only a subset of NamedTopologies. Should we just consider this a topology-level config only and throw (or log) a warning if the user sets cache size in global configs when using named topologies? Or should be consider that global size as an overall upper bound still and only set/resize specific NamedCaches when their corresponding topology set a cache size override?

I think this is a key question we'd need to discuss first, before diving into the details.

My current thoughts are that, overriding the cache/buffer size per-topology may not be straight-forward for operations, since a single KS runtime may or may not have tasks of a certain named topology at all, while on the other hand the purpose of such configs is to bound the total on-heap memory usage a KS instance would take. If we allow overrides on the per-topology level, then a single KS runtime's total memory bound may fluctuate overtime depending on whether after the rebalance whether it gets certain topologies' tasks or not, which would then be very hard in operations; similarly if we just set the global total bound regardless of whether certain topologies are assigned to a host instance, that would also be hard to understand by users. My gut feeling is that if a user do want to specific certain "priority" such that if a KS instance and its threads hosts tasks of multiple topologies, some of the hosted topologies' tasks should have a priority consuming the memory (or even CPU, as we discussed out of scope of this PR), then they'd better use a brand new configs to do so.

As for V1, I think to keep things simple we can consider two options:

  1. Do not allow topology overrides per-topology at all at KS level, and if users do override per-query at the KSQL layer, assign them to a different KS runtime, similar to EOS. Of course this means we would have less sharing runtime effectiveness.
  2. We simply take the total memory bound as MAX(global config, topology1 override, topology2 override, ...) and the per-thread cache is still set as total-memory / num-threads. That means let's say the total-memory = MAX(..) = topologyN's overrides, even if there's no tasks of topology N assigned to that thread, or even that instance at all, we still take the "benefits" of that topologyN in setting the total memory.

In the long run, I believe if users want to specify things like "let a specific query to use more resources comparably to other queries", we should let them to use a separate "priority" config to do so that leveraging on the existing configs. And that's why I'm slightly leaning towards 2) above.

@@ -233,6 +234,14 @@ public boolean hasNamedTopologies() {
return !builders.containsKey(UNNAMED_TOPOLOGY);
}

/**
* @return true iff the app is using named topologies, or was started up with no topology at all
* and the max buffer was set for the named topologies
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be related to @ableegoldman 's meta question: do we set maxBufferSize = true in the future if one of the named topology has it overridden, or only when all topologies inside the TopologyMetadata has this config overridden?

for (final Task task : tasks.allTasks()) {
tasksTotalMaxBuffer.put(
task.id().topologyName(),
task.maxBuffer() + tasksTotalMaxBuffer.getOrDefault(task.id().topologyName(), 0L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand this logic: why we add these two values to update the tasksTotalMaxBuffer? How would task.maxBuffer() be inferred in the future? Since now they are only 0 I cannot tell how would this impact the update logic.

@wcarlson5
Copy link
Contributor Author

@guozhangwang I think there are defiantly questions to be answered about this and really what we want to the user to achieve. I think the maxbufferedbytes is really being used for two purposes and perhaps we can split it out into 2 different things that will give a better result. I think it bounds the heap space and it reserves an amount of space for each thread.
It might make sense to have the users set a bound of the heap on the cluster. Then for each topology reserve some fraction of that space for that topology (either in percentages or num of bytes), then any unclaimed space can be split among the tasks (or just the tasks who have not reserved space). I think this would clear up some confusion about what this config is for. WDYT? cc/ @ableegoldman

@guozhangwang
Copy link
Contributor

@guozhangwang I think there are defiantly questions to be answered about this and really what we want to the user to achieve. I think the maxbufferedbytes is really being used for two purposes and perhaps we can split it out into 2 different things that will give a better result. I think it bounds the heap space and it reserves an amount of space for each thread.
It might make sense to have the users set a bound of the heap on the cluster. Then for each topology reserve some fraction of that space for that topology (either in percentages or num of bytes), then any unclaimed space can be split among the tasks (or just the tasks who have not reserved space). I think this would clear up some confusion about what this config is for. WDYT? cc/ @ableegoldman

Hey @wcarlson5 sorry I'm late replying here. I think on the high level what you proposed as two purposes make sense to me, it's just that in practice it may be hard to implement it in a simple and elegant way. Just throw some scenarios to deep dive here, say the instance's total cache size configured as 100, with no topology added yet.

  1. If a topology A is added with that config overridden as 150, should we just silently accepts it but only give it 100 or should we reject that topology?
  2. Assume we accept A as in 1) above, if a topology B without config overridden is added, how should we distribute the cache between the two topology? If we just consider B as 100 (the instance's config value), should we distribute the cache as 60:40 among A and B? And if another C with config overridden as 250 is added, should we redistribute the total cache size as 30:20:50 among A / B / C?
  3. Assume we reject A as in 1) above, and suppose now B without config overridden is added first, which would get all of 100, and then later D with config overridden as 40 is added, would we distribute 100 as 30:70 (i.e. D first get 40, and then B/D split the remaining 60)?
  4. Assume that we have multiple threads, do we dynamically change the cache size allocation to each thread upon rebalance based on which tasks of A / B /C each thread hosts?

Thinking about all that, I'm a bit concerned that this config would be very hard for users to understand: it seems in either way "you do not get what you specified", and the overridden value would just be used as a relative ratio. Plus if we distribute the cache among threads according to the topologies specified distribution, that would get even more cumbersome to understand.

@wcarlson5
Copy link
Contributor Author

@guozhangwang That is a really good point. Maybe we should more be specific. What if each topology could request a percentage of the total cache? If a request made the total exceed 99% the request would be rejected. Any unclaimed cache would be split among the topologies that did not claim any. A topology could lower their cache size if they want to make space for a new topology.

  1. If A requests 50% it gets it an the rest is unused
  2. B joins but does not request so it gets the other 50
  3. C joins but request 75% so it fails. C then requests 25% so now A has 50%, B 25% and C 25%
  4. D joins without a request so now A has 50%, B 12%, C 25% and D 13%
  5. A reduces its request to 25% now all have 25%
  6. E joins and requests 0%, not using any cache and all other topologies are unchanged

I think that should work. now we have both an upper bound on total memory and a minimum guarantee

@guozhangwang
Copy link
Contributor

I think that should work. now we have both an upper bound on total memory and a minimum guarantee

Yeah, I feel more comfortable for this proposal :) Basically I think instead of defining the config as an "override" on the per-application config, we should just consider having a separate config on the per-topology level (e.g. your proposed one based on percentage of the total per-application). In that way user's can specify clearly what they want, and get exactly what they specified.

@wcarlson5 wcarlson5 closed this Sep 30, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants