Skip to content

Indexing scheduling unbalanced for Kafka source #5747

@rdettai

Description

@rdettai
Collaborator

Describe the bug
When using a SourceToScheduleType::NonSharded (e.g Kafka), the current implementation of the the indexing scheduler seems systematically collocates all pipeline of a given source into the same indexer. For the Kafka source, this prevents distributing the indexing load of a given topic across indexers.

Note that this problem was already reported here. The proposed solution of setting a small cpu_capacity does not work because the scheduler scales the capacities to fit the workload before assigning the pipelines to the nodes.

Steps to reproduce (if applicable)
See test in comments.

Expected behavior
Pipelines with high throughputs should be more or less evenly distributed across indexers.

Possible solutions

  1. measure the actual load for each Kafka source (currently hardcoded to 4CPU) and use that for scheduling. This increases the risk of entering rebalancing ping pong between the control plane and the Kafka reblancing protocol.
  2. for each source, try to first limit the max number of pipelines that can be assigned to each node according to its unscaled original capacity.
  3. (variant of 2) re-introduce a source parameter like max_num_pipelines_per_indexer so that users can at least manually force the distribution of the load for given source/topics across nodes. This parameter would be pretty hard to configure properly (and hard to maintain for fluctuating workloads)

EDIT:
4) add a "num cpu per pipeline" parameter to the source, to make it possible to inform Quickwit that some Kafka topic do not require such a large amount of cpu.
5) (variant of 4) add an "average data rate" parameter to the source, which would have the same effect as "num cpu per pipeline" but easier for the user to configure (QW internally converts the bandwidth to CPUs)

Configuration:
Main (but same behavior in 0.8).

Activity

rdettai

rdettai commented on Apr 15, 2025

@rdettai
CollaboratorAuthor

Rational behind the current behavior:

  • Colocating pipelines for a source on the same node is important to improve merge quality
  • We favor a constant default for the estimated pipeline resource because we'd rather have a stable indexing plan than take the risk of entering rebalancing ping pong between the control plane and Kafka
changed the title [-]Indexing scheduling broken for Kafka source[/-] [+]Indexing scheduling unbalanced for Kafka source[/+] on Apr 16, 2025
rdettai

rdettai commented on Apr 16, 2025

@rdettai
CollaboratorAuthor

Here is an interesting behavior of the scheduler.

Let's assign one source with four pipelines to a cluster with 4 nodes (8CPU each):

let mut problem: SchedulingProblem = SchedulingProblem::with_indexer_cpu_capacities(vec![
    mcpu(8000),
    mcpu(8000),
    mcpu(8000),
    mcpu(8000),
]);
problem.add_source(
    4,
    NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis()).unwrap(),
);
let old_solution = problem.new_solution();
let solution = solve(problem, old_solution);
for assignement in &solution.indexer_assignments {
    println!("{:?}", assignement);
}
IndexerAssignment { indexer_ord: 0, num_shards_per_source: {0: 1} }
IndexerAssignment { indexer_ord: 1, num_shards_per_source: {0: 1} }
IndexerAssignment { indexer_ord: 2, num_shards_per_source: {0: 1} }
IndexerAssignment { indexer_ord: 3, num_shards_per_source: {0: 1} }

This shows that when the capacity is not exceeded, pipelines are evenly balanced across nodes. We should have colocated at least 2 pipelines per indexer according to the principle stated above:

Colocating pipelines for a source on the same node is important to improve merge quality

Now if we increase the number of sources to 4 (same cluster capacity):

let mut problem: SchedulingProblem = SchedulingProblem::with_indexer_cpu_capacities(vec![
    mcpu(8000),
    mcpu(8000),
    mcpu(8000),
    mcpu(8000),
]);
for _ in 0..4 {
    problem.add_source(
        4,
        NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis()).unwrap(),
    );
}
let old_solution = problem.new_solution();
let solution = solve(problem, old_solution);
for assignement in &solution.indexer_assignments {
    println!("{:?}", assignement);
}
IndexerAssignment { indexer_ord: 0, num_shards_per_source: {3: 4} }
IndexerAssignment { indexer_ord: 1, num_shards_per_source: {2: 4} }
IndexerAssignment { indexer_ord: 2, num_shards_per_source: {1: 4} }
IndexerAssignment { indexer_ord: 3, num_shards_per_source: {0: 4} }

Each source gets all its partitions assigned to a single node.

hardboiled

hardboiled commented on Apr 22, 2025

@hardboiled

Hey @rdettai, would there be a benefit in making some sort of affinity config for pipelines, so if you know that some topics will be pushing a lot more data than others, you could instruct quickwit indexers to share the load more; whereas, if the topic ingest variance is pretty normalized, then the dev would just leave the default config?

rdettai

rdettai commented on Apr 23, 2025

@rdettai
CollaboratorAuthor

Do you have a specific UX in mind? One would be to specify the expected bandwidth for the topic. But I feel it would be "stressful" for users who know that their workloads vary a lot through time.

The benefit of this PR is that it doesn't add an extra configuration, and it does effectively allow to spread the load. For instance if you have 8CPU nodes, setting a source to num_pipeline=8 would spread it to up to 4 nodes (a pipeline is 4CPU so the load is balanced as soon as you have more than 2).

fulmicoton-dd

fulmicoton-dd commented on Apr 23, 2025

@fulmicoton-dd
Collaborator

I think the ticket does not really finish describing the problem.
The reason we are collocating pipelines form a same index on the same indexer is to get better pruning at search time.
This is not a bug: this is a feature.

The cpu capacity parameter on indexers purpose is here to act as a throttle we accept some imbalance of loads precisely, when it helps improving this pruning thing. Setting it to 0 has for effect to favor balance.

Unfortunately this balance is only measure in term of pipeline. Quickwit has a mechanism to assess the throughput of a pipeline but we never use it for Kafka. We judge that, even more than ingest v2, Kafka is more subject to having a playing catchup mode vs indexing horizon.

So the issue with collocating pipelines from a same index on the same indexer is that the load of kafka pipelines can be very inequal, and leads to an imbalance workload on the different nodes.
This problem is not caused by that behavior, it is emphasized by it.

I advise against 1. for the reason already discussed.

2 is only partially solving the problem. (e.g. 4 nodes, 3 indexes with 1 pipeline indexing at 20mb/s and 12 indexes with a throughput of 100KB/s).

For exhaustivity, right now I believe we do not know how to run two merge concurrently, so that a merge pipeline could rapidly end up being hte bottleneck if we had ~3 indexing pipelines for a same index on the same node.
I cannot remember if the control plane already has logic preventing this or not.

I would add:

Solution 4: add a num cpu per pipeline parameter, to make it possible to inform quickwit that some kafka topic do not require such a large amount of cpu.

daniele-br

daniele-br commented on Apr 28, 2025

@daniele-br

Solution 4: add a num cpu per pipeline parameter, to make it possible to inform quickwit that some kafka topic do not require such a large amount of cpu.

OK, but just to be clear, we're in the opposite situation. We have 1-2 topics that have 70% of the data and a lower amount of data on the other 20 topics. Not sure what implications that has.

fulmicoton-dd

fulmicoton-dd commented on Apr 29, 2025

@fulmicoton-dd
Collaborator

@daniele-br This was my understanding. You would then have to mark all the topics with a lower amount of data as having a lower amount of data. I assume your comment was about reporting the oddity of having to "special case" the "majority".

rdettai

rdettai commented on Apr 29, 2025

@rdettai
CollaboratorAuthor

2 is only partially solving the problem. (e.g. 4 nodes, 3 indexes with 1 pipeline indexing at 20mb/s and 12 indexes with a throughput of 100KB/s).

@fulmicoton-dd Yes, it does not address this specific workload.

  • Don't you think that adding this extra logic is still worth it? Even if we add an extra parameter (either (4) or (5)), it would still be useful to improve the default behavior when the parameter is not specified by the user. I agree that it makes the scheduling more complex, which is something we might want to avoid, but besides that I think it is a net improvement.
  • If we add an extra parameter to hint the CPU usage of each pipeline, should we have expected_cpus_per_pipeline: CpuCapacity or expected_throughput_per_pipeline: ByteSize? Exposing the config in terms of CPU implies that the user is more aware of the internals of how pipelines use CPUs. It makes sense if we want to force the user to dig deeper before tweaking this. Exposing the config as expected bandwidth is closer to the user's perspective so easier for him to understand.
fulmicoton-dd

fulmicoton-dd commented on May 14, 2025

@fulmicoton-dd
Collaborator

expected_throughput_per_pipeline: ByteSize is probably best.

fulmicoton-dd

fulmicoton-dd commented on May 14, 2025

@fulmicoton-dd
Collaborator

For 2),

Don't you think that adding this extra logic is still worth it? Even if we add an extra parameter (either (4) or (5)), it would still be useful to improve the default behavior when the parameter is not specified by the user. I agree that it makes the scheduling more complex, which is something we might want to avoid, but besides that I think it is a net improvement.

Not really. If it is not here yet, I would like to have a non-configurable maximum number of pipeline for a same given index set to
num_pipelines.ceil_div(num_nodes).max(3). Right now, there is a real risk for indexing to outpace the merge pipeline.

evanxg852000

evanxg852000 commented on May 22, 2025

@evanxg852000
Collaborator

Hi @rdettai, is there any consensus around this? How about the PR?

rdettai

rdettai commented on Jun 10, 2025

@rdettai
CollaboratorAuthor

Found 2 other issues overlapping with this: #4470 and #4630. I need to rework the PR to limit the number of pipelines per node to 3 instead of using the node capacity.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

      Participants

      @evanxg852000@hardboiled@rdettai@daniele-br@fulmicoton-dd

      Issue actions

        Indexing scheduling unbalanced for Kafka source · Issue #5747 · quickwit-oss/quickwit