-
Notifications
You must be signed in to change notification settings - Fork 479
Description
Describe the bug
When using a SourceToScheduleType::NonSharded
(e.g Kafka), the current implementation of the the indexing scheduler seems systematically collocates all pipeline of a given source into the same indexer. For the Kafka source, this prevents distributing the indexing load of a given topic across indexers.
Note that this problem was already reported here. The proposed solution of setting a small cpu_capacity
does not work because the scheduler scales the capacities to fit the workload before assigning the pipelines to the nodes.
Steps to reproduce (if applicable)
See test in comments.
Expected behavior
Pipelines with high throughputs should be more or less evenly distributed across indexers.
Possible solutions
- measure the actual load for each Kafka source (currently hardcoded to 4CPU) and use that for scheduling. This increases the risk of entering rebalancing ping pong between the control plane and the Kafka reblancing protocol.
- for each source, try to first limit the max number of pipelines that can be assigned to each node according to its unscaled original capacity.
- (variant of 2) re-introduce a source parameter like
max_num_pipelines_per_indexer
so that users can at least manually force the distribution of the load for given source/topics across nodes. This parameter would be pretty hard to configure properly (and hard to maintain for fluctuating workloads)
EDIT:
4) add a "num cpu per pipeline" parameter to the source, to make it possible to inform Quickwit that some Kafka topic do not require such a large amount of cpu.
5) (variant of 4) add an "average data rate" parameter to the source, which would have the same effect as "num cpu per pipeline" but easier for the user to configure (QW internally converts the bandwidth to CPUs)
Configuration:
Main (but same behavior in 0.8).
Activity
rdettai commentedon Apr 15, 2025
Rational behind the current behavior:
[-]Indexing scheduling broken for Kafka source[/-][+]Indexing scheduling unbalanced for Kafka source[/+]rdettai commentedon Apr 16, 2025
Here is an interesting behavior of the scheduler.
Let's assign one source with four pipelines to a cluster with 4 nodes (8CPU each):
This shows that when the capacity is not exceeded, pipelines are evenly balanced across nodes. We should have colocated at least 2 pipelines per indexer according to the principle stated above:
Now if we increase the number of sources to 4 (same cluster capacity):
Each source gets all its partitions assigned to a single node.
hardboiled commentedon Apr 22, 2025
Hey @rdettai, would there be a benefit in making some sort of affinity config for pipelines, so if you know that some topics will be pushing a lot more data than others, you could instruct quickwit indexers to share the load more; whereas, if the topic ingest variance is pretty normalized, then the dev would just leave the default config?
rdettai commentedon Apr 23, 2025
Do you have a specific UX in mind? One would be to specify the expected bandwidth for the topic. But I feel it would be "stressful" for users who know that their workloads vary a lot through time.
The benefit of this PR is that it doesn't add an extra configuration, and it does effectively allow to spread the load. For instance if you have 8CPU nodes, setting a source to
num_pipeline=8
would spread it to up to 4 nodes (a pipeline is 4CPU so the load is balanced as soon as you have more than 2).fulmicoton-dd commentedon Apr 23, 2025
I think the ticket does not really finish describing the problem.
The reason we are collocating pipelines form a same index on the same indexer is to get better pruning at search time.
This is not a bug: this is a feature.
The cpu capacity parameter on indexers purpose is here to act as a throttle we accept some imbalance of loads precisely, when it helps improving this pruning thing. Setting it to 0 has for effect to favor balance.
Unfortunately this balance is only measure in term of pipeline. Quickwit has a mechanism to assess the throughput of a pipeline but we never use it for Kafka. We judge that, even more than ingest v2, Kafka is more subject to having a playing catchup mode vs indexing horizon.
So the issue with collocating pipelines from a same index on the same indexer is that the load of kafka pipelines can be very inequal, and leads to an imbalance workload on the different nodes.
This problem is not caused by that behavior, it is emphasized by it.
I advise against 1. for the reason already discussed.
2 is only partially solving the problem. (e.g. 4 nodes, 3 indexes with 1 pipeline indexing at 20mb/s and 12 indexes with a throughput of 100KB/s).
For exhaustivity, right now I believe we do not know how to run two merge concurrently, so that a merge pipeline could rapidly end up being hte bottleneck if we had ~3 indexing pipelines for a same index on the same node.
I cannot remember if the control plane already has logic preventing this or not.
I would add:
Solution 4: add a num cpu per pipeline parameter, to make it possible to inform quickwit that some kafka topic do not require such a large amount of cpu.
daniele-br commentedon Apr 28, 2025
OK, but just to be clear, we're in the opposite situation. We have 1-2 topics that have 70% of the data and a lower amount of data on the other 20 topics. Not sure what implications that has.
fulmicoton-dd commentedon Apr 29, 2025
@daniele-br This was my understanding. You would then have to mark all the topics with a lower amount of data as having a lower amount of data. I assume your comment was about reporting the oddity of having to "special case" the "majority".
rdettai commentedon Apr 29, 2025
@fulmicoton-dd Yes, it does not address this specific workload.
expected_cpus_per_pipeline: CpuCapacity
orexpected_throughput_per_pipeline: ByteSize
? Exposing the config in terms of CPU implies that the user is more aware of the internals of how pipelines use CPUs. It makes sense if we want to force the user to dig deeper before tweaking this. Exposing the config as expected bandwidth is closer to the user's perspective so easier for him to understand.fulmicoton-dd commentedon May 14, 2025
expected_throughput_per_pipeline: ByteSize
is probably best.fulmicoton-dd commentedon May 14, 2025
For 2),
Not really. If it is not here yet, I would like to have a non-configurable maximum number of pipeline for a same given index set to
num_pipelines.ceil_div(num_nodes).max(3)
. Right now, there is a real risk for indexing to outpace the merge pipeline.evanxg852000 commentedon May 22, 2025
Hi @rdettai, is there any consensus around this? How about the PR?
rdettai commentedon Jun 10, 2025
Found 2 other issues overlapping with this: #4470 and #4630. I need to rework the PR to limit the number of pipelines per node to 3 instead of using the node capacity.