feat(taskbroker): Support multiple topics#668
Conversation
With one consumer per topic, the consumer/pipeline metrics raced (gauges) or merged across topics (counters/histograms). Add a `topic` tag to the rebalance gauges/counters, the activation writer and batcher metrics, and the deserialize payload-size histograms. Store-level metrics are left untagged. Also demote sqlite's per-consumer assign_partitions warn to debug. ref STREAM-1042 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
A topic with raw mode but missing namespace/application/taskname/ processing_deadline_duration passed config validation and only panicked later when the consumer built its deserializer (via .expect()). Validate completeness (and that the application is in worker_map, and the retry topic differs from the raw topic) in normalize_and_validate so it's a clean config error instead. ref STREAM-1042 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The forwarding producer authenticates against the deadletter cluster and only overrides bootstrap.servers, so the deadletter cluster is the only target where its credentials reliably work. The consumer-side batcher and the upkeep path disagreed on the default forward cluster when demoted_topic_cluster is unset: the batcher used each topic's own cluster while upkeep used the deadletter cluster for multi-topic. Since the demoted topic is a single global topic, default both to the deadletter cluster. Unchanged for legacy single-topic, where the deadletter cluster address defaults to the consumed cluster's address. ref STREAM-1042 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Follow-up to the forward-cluster fix: the upkeep comment overstated that legacy behavior is "unchanged" (it only is when kafka_deadletter_cluster is unset), and the demoted_topic_cluster doc still claimed it defaults to the consumed cluster. Correct both to say it defaults to the deadletter cluster. ref STREAM-1042 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 2724130. Configure here.
| }; | ||
| metrics::counter!( | ||
| "consumer.inflight_activation_writer.backpressure", | ||
| "topic" => self.config.topic.clone(), |
There was a problem hiding this comment.
Shared store backpressure race
Medium Severity
With multiple consumable topics, each topic pipeline has its own ActivationWriter on the same store, but flush still decides backpressure from a single count_depths snapshot with no coordination. Two writers can both pass the pending, delay, processing, or DB size checks and insert together, so shared max_* limits can be exceeded under concurrent consumption.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 2724130. Configure here.
There was a problem hiding this comment.
this is the kind of thing i'd like to scope into a followup pr. the current way we manage db connections (proportional to the amount of topics we connect to) is fundamentally leading to bad performance. in my mind the fix is to reuse the activationwriter and -batcher across topics, but this seemed like a more invasive change as it would require me to break them out of the consumer-specific pipeline.
reusing those services across topics would lower the amount of db transactions and also fix this race.
There was a problem hiding this comment.
I agree with this. Also, there isn't a "hard" cap on the DB size, we just want to make sure the DB can't grow unbounded. So this race might let one extra write in, but then things will still backpressure after that.
evanh
left a comment
There was a problem hiding this comment.
Agree that in the future we should have ActivationBatcher and ActivationWriter be standalone components that are shared across all consumers.
| }; | ||
| metrics::counter!( | ||
| "consumer.inflight_activation_writer.backpressure", | ||
| "topic" => self.config.topic.clone(), |
There was a problem hiding this comment.
I agree with this. Also, there isn't a "hard" cap on the DB size, we just want to make sure the DB can't grow unbounded. So this race might let one extra write in, but then things will still backpressure after that.
…116758) See getsentry/taskbroker#667 for details. Same kafka config migration as the self-hosted change, but for the `ingest-profiles` taskbroker in devservices. Deprecation warnings land as per getsentry/taskbroker#663 `ingest-profiles` runs in raw mode, and raw mode now requires an explicit retry topic: raw payloads aren't activations, so retries can't loop back into the `profiles` topic. Retries now go to the main `taskworker` topic so the existing taskbroker picks them up instead of running another broker. Depends on getsentry/taskbroker#668, which wires up per-topic raw mode and enforces the retry-topic requirement. Once that ships, the current devenv config breaks without this change. Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>


Continuing from #663, this is the bare minimum for multi-topic to be useful. We spawn one consumer per topic, and then each consumer has its own pipeline, activationbatcher, but shares an activationstore.
What is deliberately left out of this PR and left for follow-up:
topiccolumn now, but actually due to other reasons (slicing) we're reconsidering whether evenpartitionshould even be there. The main reasonpartitionexists is to avoid (row-level?) lock contention when many brokers share an alloydb instance. This, however, makes draining and migration of topics more complicated, so we are considering using another "sharding key" in pg entirely.Metrics are not fully fixed yet.I fixed everything but realistically there will be huge churn for our dashboards.Other things fixed:
This, to me, is the bare minimum that is useful for consolidating existing low-traffic/low-cost pools. I want to use this PR as-is to achieve that and come back to the other points async.
ref STREAM-1042
ref /STREAM-1096