From 48b02e39e0ff83c05b942244d7f188ba99e24b93 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 3 Jun 2026 11:49:42 +0200 Subject: [PATCH 01/10] Minor refactor for config and admin client creation --- src/config.rs | 191 +++++++++++++++++++++++------------ src/kafka/deserialize.rs | 13 ++- src/kafka/deserialize_raw.rs | 48 ++++----- 3 files changed, 165 insertions(+), 87 deletions(-) diff --git a/src/config.rs b/src/config.rs index 5231b2d4..ea864609 100644 --- a/src/config.rs +++ b/src/config.rs @@ -89,6 +89,35 @@ impl ClusterConfig { || self.ssl_certificate_location.is_some() || self.ssl_key_location.is_some() } + + /// Apply this cluster's `bootstrap.servers` and any configured sasl/ssl + /// auth onto an rdkafka `ClientConfig`. Shared by the consumer, producer and + /// admin config builders so they all authenticate identically. + fn apply_to(&self, config: &mut ClientConfig) { + config.set("bootstrap.servers", self.address.clone()); + + if let Some(ref sasl_mechanism) = self.sasl_mechanism { + config.set("sasl.mechanism", sasl_mechanism); + } + if let Some(ref sasl_username) = self.sasl_username { + config.set("sasl.username", sasl_username); + } + if let Some(ref sasl_password) = self.sasl_password { + config.set("sasl.password", sasl_password); + } + if let Some(ref security_protocol) = self.security_protocol { + config.set("security.protocol", security_protocol); + } + if let Some(ref ssl_ca_location) = self.ssl_ca_location { + config.set("ssl.ca.location", ssl_ca_location); + } + if let Some(ref ssl_certificate_location) = self.ssl_certificate_location { + config.set("ssl.certificate.location", ssl_certificate_location); + } + if let Some(ref ssl_private_key_location) = self.ssl_key_location { + config.set("ssl.key.location", ssl_private_key_location); + } + } } #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Deserialize, Serialize)] @@ -618,8 +647,9 @@ impl Config { let uses_new_format = !self.kafka_topics.is_empty() || !self.kafka_clusters.is_empty(); // Any explicitly-set deprecated field describing a cluster (the main - // consumed cluster or the deadletter cluster). kafka_deadletter_topic is - // NOT deprecated and is intentionally excluded. + // consumed cluster or the deadletter cluster) or the deprecated global + // raw mode. kafka_deadletter_topic is NOT deprecated and is intentionally + // excluded. let uses_legacy = self.kafka_topic.is_some() || self.kafka_cluster.is_some() || self.kafka_consumer_group.is_some() @@ -637,7 +667,10 @@ impl Config { || self.kafka_deadletter_sasl_password.is_some() || self.kafka_deadletter_ssl_ca_location.is_some() || self.kafka_deadletter_ssl_certificate_location.is_some() - || self.kafka_deadletter_ssl_key_location.is_some(); + || self.kafka_deadletter_ssl_key_location.is_some() + // Global raw mode is a deprecated legacy field; in the new format raw + // mode is configured per topic via kafka_topics..raw. + || self.raw_mode; if uses_new_format && uses_legacy { return Err(anyhow!( @@ -819,8 +852,25 @@ impl Config { })?; } - // Validate exactly one consumable topic - self.consumable_topic()?; + // Validate at least one consumable topic. + let consumable = self.consumable_topics()?; + + // Multi-topic consumption is only supported on the sqlite adapter for + // now. The postgres adapter filters claims by a single shared partition + // list, but those partition numbers aren't unique across topics, so the + // filter would mix partitions from different topics together. Note this + // filtering exists only to avoid lock contention between brokers, not for + // correctness; supporting multi-topic on postgres means reworking how we + // avoid that contention (e.g. filtering by (topic, partition) or another + // mechanism entirely). Reject the combination here, before any consumer + // spawns. + if consumable.len() > 1 && self.database_adapter == DatabaseAdapter::Postgres { + return Err(anyhow!( + "multi-topic consumption ({} consumable topics) is not supported with the \ + postgres database adapter; use the sqlite adapter or a single consumable topic", + consumable.len() + )); + } // The deadletter topic must be a declared topic so the producer can // resolve its cluster. In legacy mode it was added above; in the new @@ -840,8 +890,22 @@ impl Config { // would be published to the wrong brokers. In the legacy format the // retry topic is registered above; in the new format the user must // declare it in kafka_topics. - let (consumed_topic, _) = self.consumable_topic()?; - let retry_target = self.kafka_retry_topic.as_deref().unwrap_or(consumed_topic); + // With a single consumable topic, retries fall back to that topic when + // no retry topic is configured. With multiple consumable topics there is + // no unambiguous fallback, so a dedicated retry topic is mandatory (the + // design doc specifies a single retry topic shared across all consumed + // topics rather than per-topic retry routing). + let retry_target = match self.kafka_retry_topic.as_deref() { + Some(retry_topic) => retry_topic, + None if consumable.len() == 1 => consumable[0].0, + None => { + return Err(anyhow!( + "kafka_retry_topic is required when consuming from multiple topics ({} \ + consumable topics configured)", + consumable.len() + )); + } + }; let retry_topic_config = self.kafka_topics.get(retry_target).ok_or_else(|| { Box::new(figment::Error::from(format!( "kafka_retry_topic '{retry_target}' is not defined in kafka_topics" @@ -865,6 +929,28 @@ impl Config { Ok(()) } + /// Get all consumable (non-`produce_only`) topics and their configs, in + /// `kafka_topics` (BTreeMap) order. Returns an error only when there are no + /// consumable topics. This is the multi-topic entry point; prefer it over + /// [`Config::consumable_topic`] for anything that should support more than + /// one consumed topic. + pub fn consumable_topics(&self) -> Result, Box> { + let consumable: Vec<(&str, &TopicConfig)> = self + .kafka_topics + .iter() + .filter(|(_, cfg)| !cfg.produce_only) + .map(|(name, cfg)| (name.as_str(), cfg)) + .collect(); + + if consumable.is_empty() { + return Err(Box::new(figment::Error::from( + "no consumable topic configured (all topics have produce_only: true)".to_owned(), + ))); + } + + Ok(consumable) + } + /// Get the single consumable topic and its config. /// Returns an error if there are zero or multiple consumable topics. pub fn consumable_topic(&self) -> Result<(&str, &TopicConfig), Box> { @@ -902,13 +988,25 @@ impl Config { .ok_or_else(|| Box::new(figment::Error::from(format!("unknown cluster: {}", name)))) } - /// Convert the application Config into rdkafka::ClientConfig for consumer. - /// Uses the single consumable topic's cluster. + /// Convert the application Config into rdkafka::ClientConfig for the consumer + /// of the single consumable topic. /// Panics if config wasn't validated (call from_args, not extract directly). pub fn kafka_consumer_config(&self) -> ClientConfig { - let (_, topic_config) = self + let (topic_name, _) = self .consumable_topic() .expect("consumable_topic failed - was config validated?"); + self.kafka_consumer_config_for(topic_name) + } + + /// Convert the application Config into rdkafka::ClientConfig for the consumer + /// of a specific topic. Each consumed topic has its own consumer (own + /// `group.id` and cluster), so multi-topic spawns one consumer per topic. + /// Panics if `topic_name` isn't a declared topic (call from_args first). + pub fn kafka_consumer_config_for(&self, topic_name: &str) -> ClientConfig { + let topic_config = self + .kafka_topics + .get(topic_name) + .unwrap_or_else(|| panic!("unknown topic '{topic_name}' - was config validated?")); let cluster = self .cluster(&topic_config.cluster) .expect("cluster lookup failed - was config validated?"); @@ -925,9 +1023,9 @@ impl Config { .clone() .unwrap_or_else(|| self.kafka_auto_offset_reset.clone()); - let mut new_config = ClientConfig::new(); - let config = new_config - .set("bootstrap.servers", cluster.address.clone()) + let mut config = ClientConfig::new(); + cluster.apply_to(&mut config); + config .set("group.id", topic_config.consumer_group.clone()) .set("session.timeout.ms", session_timeout_ms.to_string()) .set("enable.partition.eof", "false") @@ -939,29 +1037,21 @@ impl Config { .set("auto.offset.reset", auto_offset_reset) .set("enable.auto.offset.store", "false"); - if let Some(ref sasl_mechanism) = cluster.sasl_mechanism { - config.set("sasl.mechanism", sasl_mechanism); - } - if let Some(ref sasl_username) = cluster.sasl_username { - config.set("sasl.username", sasl_username); - } - if let Some(ref sasl_password) = cluster.sasl_password { - config.set("sasl.password", sasl_password); - } - if let Some(ref security_protocol) = cluster.security_protocol { - config.set("security.protocol", security_protocol); - } - if let Some(ref ssl_ca_location) = cluster.ssl_ca_location { - config.set("ssl.ca.location", ssl_ca_location); - } - if let Some(ref ssl_certificate_location) = cluster.ssl_certificate_location { - config.set("ssl.certificate.location", ssl_certificate_location); - } - if let Some(ref ssl_private_key_location) = cluster.ssl_key_location { - config.set("ssl.key.location", ssl_private_key_location); - } + config + } + + /// Build an rdkafka::ClientConfig for an admin client on a named cluster. + /// Carries only `bootstrap.servers` + that cluster's auth (no consumer + /// settings), so topic creation targets the correct brokers. + /// Panics if the cluster isn't declared (call from_args first). + pub fn kafka_admin_config(&self, cluster_name: &str) -> ClientConfig { + let cluster = self + .cluster(cluster_name) + .expect("cluster lookup failed - was config validated?"); - config.clone() + let mut config = ClientConfig::new(); + cluster.apply_to(&mut config); + config } /// The cluster the deadletter / forwarding producer connects to: the @@ -985,33 +1075,10 @@ impl Config { pub fn kafka_producer_config(&self) -> ClientConfig { let cluster = self.kafka_producer_cluster(); - let mut new_config = ClientConfig::new(); - let config = new_config - .set("bootstrap.servers", cluster.address.clone()) - .set("message.max.bytes", format!("{}", self.max_message_size)); - if let Some(ref sasl_mechanism) = cluster.sasl_mechanism { - config.set("sasl.mechanism", sasl_mechanism); - } - if let Some(ref sasl_username) = cluster.sasl_username { - config.set("sasl.username", sasl_username); - } - if let Some(ref sasl_password) = cluster.sasl_password { - config.set("sasl.password", sasl_password); - } - if let Some(ref security_protocol) = cluster.security_protocol { - config.set("security.protocol", security_protocol); - } - if let Some(ref ssl_ca_location) = cluster.ssl_ca_location { - config.set("ssl.ca.location", ssl_ca_location); - } - if let Some(ref ssl_certificate_location) = cluster.ssl_certificate_location { - config.set("ssl.certificate.location", ssl_certificate_location); - } - if let Some(ref ssl_private_key_location) = cluster.ssl_key_location { - config.set("ssl.key.location", ssl_private_key_location); - } - - config.clone() + let mut config = ClientConfig::new(); + cluster.apply_to(&mut config); + config.set("message.max.bytes", format!("{}", self.max_message_size)); + config } } diff --git a/src/kafka/deserialize.rs b/src/kafka/deserialize.rs index 0ab18d0e..61f484b1 100644 --- a/src/kafka/deserialize.rs +++ b/src/kafka/deserialize.rs @@ -18,10 +18,19 @@ pub struct DeserializeConfig { } impl DeserializeConfig { - pub fn from_config(config: &Config) -> Self { + /// Build the deserializer config for a single consumed topic. Raw mode is + /// taken from that topic's `kafka_topics..raw`, so each consumer + /// (legacy single-topic included) deserializes according to its own topic. + pub fn from_topic(config: &Config, topic_name: &str) -> Self { + let raw_config = config + .kafka_topics + .get(topic_name) + .and_then(|topic| topic.raw.as_ref()) + .map(|raw| RawConfig::from_topic(config, topic_name, raw)); + Self { activation_config: DeserializeActivationConfig::from_config(config), - raw_config: RawConfig::from_config(config), + raw_config, retry_topic: config.kafka_retry_topic.clone(), } } diff --git a/src/kafka/deserialize_raw.rs b/src/kafka/deserialize_raw.rs index 89577850..34f1b505 100644 --- a/src/kafka/deserialize_raw.rs +++ b/src/kafka/deserialize_raw.rs @@ -10,7 +10,7 @@ use rdkafka::message::OwnedMessage; use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, TaskActivation}; use uuid::Uuid; -use crate::config::Config; +use crate::config::{Config, RawModeConfig}; use crate::store::activation::{Activation, ActivationStatus}; use super::deserialize_activation::bucket_from_id; @@ -29,43 +29,45 @@ pub struct RawConfig { } impl RawConfig { - pub fn from_config(config: &Config) -> Option { - if !config.raw_mode { - return None; - } - let application = config - .raw_application + /// Build the raw-mode deserializer config for a single topic from that + /// topic's [`RawModeConfig`]. The legacy global `raw_*` fields are migrated + /// into the topic's `raw` during config normalization, so this is the single + /// runtime source of truth for both legacy and multi-topic formats. + pub fn from_topic(config: &Config, topic_name: &str, raw: &RawModeConfig) -> Self { + let application = raw + .application .clone() - .expect("raw_application required when raw_mode is enabled"); + .expect("raw application required when a topic enables raw mode"); assert!( config.worker_map.contains_key(&application), - "raw_application '{}' must exist in worker_map", + "raw application '{}' must exist in worker_map", application ); + // A raw topic's messages aren't activations, so its retries must go to a + // separate (activation-encoded) retry topic, never back to itself. if let Some(ref retry_topic) = config.kafka_retry_topic { - let (main_topic, _) = config - .consumable_topic() - .expect("no consumable topic configured"); assert!( - retry_topic != main_topic, - "kafka_retry_topic cannot equal kafka_topic when raw_mode is enabled" + retry_topic != topic_name, + "kafka_retry_topic cannot equal raw topic '{topic_name}'" ); } - Some(Self { - namespace: config - .raw_namespace + Self { + namespace: raw + .namespace .clone() - .expect("raw_namespace required when raw_mode is enabled"), + .expect("raw namespace required when a topic enables raw mode"), application, - taskname: config - .raw_taskname + taskname: raw + .taskname .clone() - .expect("raw_taskname required when raw_mode is enabled"), - processing_deadline_duration: config.raw_processing_deadline_duration, - }) + .expect("raw taskname required when a topic enables raw mode"), + processing_deadline_duration: raw + .processing_deadline_duration + .expect("raw processing_deadline_duration required when a topic enables raw mode"), + } } } From a0e0351a6cc7274f43d94cdda46ab0199d203fe8 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 3 Jun 2026 12:28:08 +0200 Subject: [PATCH 02/10] more plumbing, spawn many consumers --- src/config.rs | 135 ++++++++++++++++---------------- src/kafka/activation_batcher.rs | 25 +++--- src/kafka/admin.rs | 59 ++++++++++---- src/main.rs | 74 ++++++++--------- src/test_utils.rs | 6 +- src/upkeep.rs | 46 ++++++----- 6 files changed, 194 insertions(+), 151 deletions(-) diff --git a/src/config.rs b/src/config.rs index ea864609..aa26df2d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -890,22 +890,39 @@ impl Config { // would be published to the wrong brokers. In the legacy format the // retry topic is registered above; in the new format the user must // declare it in kafka_topics. - // With a single consumable topic, retries fall back to that topic when - // no retry topic is configured. With multiple consumable topics there is - // no unambiguous fallback, so a dedicated retry topic is mandatory (the - // design doc specifies a single retry topic shared across all consumed - // topics rather than per-topic retry routing). - let retry_target = match self.kafka_retry_topic.as_deref() { - Some(retry_topic) => retry_topic, - None if consumable.len() == 1 => consumable[0].0, - None => { + // Normalize the retry topic so downstream code can always rely on it + // being set: + // - explicitly configured: used as-is. + // - single non-raw consumed topic: retries loop back to that topic. + // - raw mode: raw messages aren't activations, so retries must go to a + // separate activation-encoded topic; a retry topic is mandatory. + // - multiple consumed topics: no unambiguous fallback, so a single + // shared retry topic is mandatory. + if self.kafka_retry_topic.is_none() { + let has_raw = consumable.iter().any(|(_, cfg)| cfg.raw.is_some()); + let count = consumable.len(); + let single_topic = (count == 1).then(|| consumable[0].0.to_owned()); + + if has_raw { return Err(anyhow!( - "kafka_retry_topic is required when consuming from multiple topics ({} \ - consumable topics configured)", - consumable.len() + "kafka_retry_topic must be set explicitly when a consumed topic uses raw mode" )); } - }; + match single_topic { + Some(topic) => self.kafka_retry_topic = Some(topic), + None => { + return Err(anyhow!( + "kafka_retry_topic is required when consuming from multiple topics ({} \ + consumable topics configured)", + count + )); + } + } + } + let retry_target = self + .kafka_retry_topic + .as_deref() + .expect("kafka_retry_topic is set above"); let retry_topic_config = self.kafka_topics.get(retry_target).ok_or_else(|| { Box::new(figment::Error::from(format!( "kafka_retry_topic '{retry_target}' is not defined in kafka_topics" @@ -931,9 +948,9 @@ impl Config { /// Get all consumable (non-`produce_only`) topics and their configs, in /// `kafka_topics` (BTreeMap) order. Returns an error only when there are no - /// consumable topics. This is the multi-topic entry point; prefer it over - /// [`Config::consumable_topic`] for anything that should support more than - /// one consumed topic. + /// consumable topics. This is the sole accessor for consumed topics; callers + /// that only handle a single topic should iterate and select explicitly + /// rather than assuming exactly one. pub fn consumable_topics(&self) -> Result, Box> { let consumable: Vec<(&str, &TopicConfig)> = self .kafka_topics @@ -951,33 +968,14 @@ impl Config { Ok(consumable) } - /// Get the single consumable topic and its config. - /// Returns an error if there are zero or multiple consumable topics. - pub fn consumable_topic(&self) -> Result<(&str, &TopicConfig), Box> { - let mut consumable = self - .kafka_topics - .iter() - .filter(|(_, cfg)| !cfg.produce_only); - - let first = consumable.next().ok_or_else(|| { - Box::new(figment::Error::from( - "no consumable topic configured (all topics have produce_only: true)".to_owned(), - )) - })?; - - if consumable.next().is_some() { - let count = self - .kafka_topics - .values() - .filter(|t| !t.produce_only) - .count(); - return Err(Box::new(figment::Error::from(format!( - "multi-topic consumption is not yet supported: {} consumable topics configured, maximum is 1", - count - )))); - } - - Ok((first.0.as_str(), first.1)) + /// The topic retries are produced to. `normalize_and_validate` always sets + /// `kafka_retry_topic` (it defaults to the single consumed topic when only + /// one non-raw topic is configured), so this is infallible after validation. + /// Panics if config wasn't validated (call from_args, not extract directly). + pub fn retry_topic(&self) -> &str { + self.kafka_retry_topic + .as_deref() + .expect("kafka_retry_topic unset - was config validated?") } /// Get cluster config by name. @@ -988,16 +986,6 @@ impl Config { .ok_or_else(|| Box::new(figment::Error::from(format!("unknown cluster: {}", name)))) } - /// Convert the application Config into rdkafka::ClientConfig for the consumer - /// of the single consumable topic. - /// Panics if config wasn't validated (call from_args, not extract directly). - pub fn kafka_consumer_config(&self) -> ClientConfig { - let (topic_name, _) = self - .consumable_topic() - .expect("consumable_topic failed - was config validated?"); - self.kafka_consumer_config_for(topic_name) - } - /// Convert the application Config into rdkafka::ClientConfig for the consumer /// of a specific topic. Each consumed topic has its own consumer (own /// `group.id` and cluster), so multi-topic spawns one consumer per topic. @@ -1269,7 +1257,7 @@ mod tests { // kafka_consumer_group is unset in the yaml, so the legacy field // stays None and normalization applies the "taskworker" default. assert_eq!(config.kafka_consumer_group, None); - let (topic_name, topic_config) = config.consumable_topic().unwrap(); + let (topic_name, topic_config) = config.consumable_topics().unwrap()[0]; assert_eq!(topic_name, "error-tasks"); assert_eq!(topic_config.consumer_group, "taskworker"); assert_eq!(config.kafka_auto_offset_reset, "earliest".to_owned()); @@ -1331,7 +1319,7 @@ mod tests { // Zero-config: legacy fields stay None, but normalization applies // the historical "taskworker" default as the consumable topic. assert_eq!(config.kafka_topic, None); - let (topic_name, topic_config) = config.consumable_topic().unwrap(); + let (topic_name, topic_config) = config.consumable_topics().unwrap()[0]; assert_eq!(topic_name, "taskworker"); assert_eq!( config.cluster(&topic_config.cluster).unwrap().address, @@ -1384,7 +1372,7 @@ mod tests { fn test_kafka_consumer_config() { let args = Args { config: None }; let config = Config::from_args(&args).unwrap(); - let consumer_config = config.kafka_consumer_config(); + let consumer_config = config.kafka_consumer_config_for("taskworker"); assert_eq!( consumer_config.get("bootstrap.servers").unwrap(), @@ -1404,7 +1392,7 @@ mod tests { let args = Args { config: None }; let config = Config::from_args(&args).unwrap(); - let consumer_config = config.kafka_consumer_config(); + let consumer_config = config.kafka_consumer_config_for("taskworker"); assert_eq!( consumer_config.get("security.protocol").unwrap(), @@ -1439,7 +1427,7 @@ mod tests { let args = Args { config: None }; let config = Config::from_args(&args).unwrap(); - let consumer_config = config.kafka_consumer_config(); + let consumer_config = config.kafka_consumer_config_for("taskworker"); assert_eq!( consumer_config.get("ssl.ca.location").unwrap(), @@ -1585,6 +1573,7 @@ mod tests { "config.yaml", r#" kafka_deadletter_topic: profiles-dlq +kafka_retry_topic: profiles-retry kafka_topics: profiles: @@ -1652,7 +1641,7 @@ kafka_clusters: assert!(!clusters.contains_key("default")); // Test consumable_topic() and cluster() helpers - let (topic_name, topic_config) = config.consumable_topic().unwrap(); + let (topic_name, topic_config) = config.consumable_topics().unwrap()[0]; assert_eq!(topic_name, "profiles"); assert_eq!(topic_config.cluster, "profiles-cluster"); @@ -1711,7 +1700,7 @@ kafka_clusters: assert!(!clusters.contains_key("default")); // Test consumable_topic() helper - let (topic_name, _) = config.consumable_topic().unwrap(); + let (topic_name, _) = config.consumable_topics().unwrap()[0]; assert_eq!(topic_name, "profiles"); Ok(()) @@ -1812,12 +1801,18 @@ kafka_clusters: } #[test] - fn test_multi_topic_rejects_multiple_consumable_topics() { + fn test_multi_topic_rejected_on_postgres() { Jail::expect_with(|jail| { - // Two consumable topics - the guard for this PR rejects this. + // Multiple consumable topics are allowed on sqlite but rejected on + // postgres, whose claim filtering can't distinguish partitions + // across topics. jail.create_file( "config.yaml", r#" +database_adapter: postgres +kafka_deadletter_topic: tasks-dlq +kafka_retry_topic: tasks-retry + kafka_topics: profiles: cluster: my-cluster @@ -1825,6 +1820,14 @@ kafka_topics: subscriptions: cluster: my-cluster consumer_group: taskbroker-subscriptions + tasks-retry: + cluster: my-cluster + consumer_group: taskbroker-retry + produce_only: true + tasks-dlq: + cluster: my-cluster + consumer_group: taskbroker-dlq + produce_only: true kafka_clusters: my-cluster: @@ -1838,7 +1841,7 @@ kafka_clusters: let err = Config::from_args(&args).unwrap_err(); assert!( err.to_string() - .contains("multi-topic consumption is not yet supported"), + .contains("not supported with the postgres database adapter"), "unexpected error: {}", err ); @@ -1889,7 +1892,7 @@ kafka_clusters: assert!(topics.get("profiles-dlq").unwrap().produce_only); // consumable_topic() returns the one consumable topic - let (topic_name, _) = config.consumable_topic().unwrap(); + let (topic_name, _) = config.consumable_topics().unwrap()[0]; assert_eq!(topic_name, "profiles"); Ok(()) @@ -2043,7 +2046,7 @@ kafka_clusters: config: Some("config.yaml".to_owned()), }; let config = Config::from_args(&args).unwrap(); - let consumer_config = config.kafka_consumer_config(); + let consumer_config = config.kafka_consumer_config_for("profiles"); // Per-topic values win over the globals. assert_eq!(consumer_config.get("session.timeout.ms").unwrap(), "12000"); @@ -2090,7 +2093,7 @@ kafka_clusters: config: Some("config.yaml".to_owned()), }; let config = Config::from_args(&args).unwrap(); - let consumer_config = config.kafka_consumer_config(); + let consumer_config = config.kafka_consumer_config_for("profiles"); // No per-topic overrides, so the globals are used. assert_eq!(consumer_config.get("session.timeout.ms").unwrap(), "7000"); diff --git a/src/kafka/activation_batcher.rs b/src/kafka/activation_batcher.rs index a5a1ba73..f5748f5a 100644 --- a/src/kafka/activation_batcher.rs +++ b/src/kafka/activation_batcher.rs @@ -29,11 +29,14 @@ pub struct ActivationBatcherConfig { } impl ActivationBatcherConfig { - /// Convert from application configuration into ActivationBatcher config. - pub fn from_config(config: &Config) -> Self { - let (topic_name, topic_config) = config - .consumable_topic() - .expect("no consumable topic configured"); + /// Convert from application configuration into ActivationBatcher config for a + /// single consumed topic. Each consumer has its own batcher, so the topic is + /// passed explicitly rather than derived from "the" consumable topic. + pub fn from_topic(config: &Config, topic_name: &str) -> Self { + let topic_config = config + .kafka_topics + .get(topic_name) + .unwrap_or_else(|| panic!("unknown topic '{topic_name}'")); let cluster = config .cluster(&topic_config.cluster) .expect("cluster not found"); @@ -252,7 +255,7 @@ demoted_namespaces: config.normalize_and_validate().unwrap(); let config = Arc::new(config); let mut batcher = ActivationBatcher::new( - ActivationBatcherConfig::from_config(&config), + ActivationBatcherConfig::from_topic(&config, config.consumable_topics().unwrap()[0].0), runtime_config, ); @@ -275,7 +278,7 @@ demoted_namespaces: config.normalize_and_validate().unwrap(); let config = Arc::new(config); let mut batcher = ActivationBatcher::new( - ActivationBatcherConfig::from_config(&config), + ActivationBatcherConfig::from_topic(&config, config.consumable_topics().unwrap()[0].0), runtime_config, ); @@ -304,7 +307,7 @@ demoted_namespaces: let config = Arc::new(config); let mut batcher = ActivationBatcher::new( - ActivationBatcherConfig::from_config(&config), + ActivationBatcherConfig::from_topic(&config, config.consumable_topics().unwrap()[0].0), runtime_config, ); @@ -334,7 +337,7 @@ demoted_namespaces: let config = Arc::new(config); let mut batcher = ActivationBatcher::new( - ActivationBatcherConfig::from_config(&config), + ActivationBatcherConfig::from_topic(&config, config.consumable_topics().unwrap()[0].0), runtime_config, ); @@ -380,11 +383,11 @@ demoted_topic: taskworker-demoted"#; config.normalize_and_validate().unwrap(); let config = Arc::new(config); let mut batcher = ActivationBatcher::new( - ActivationBatcherConfig::from_config(&config), + ActivationBatcherConfig::from_topic(&config, config.consumable_topics().unwrap()[0].0), runtime_config, ); - let (_, topic_config) = config.consumable_topic().unwrap(); + let (_, topic_config) = config.consumable_topics().unwrap()[0]; let cluster_address = config .cluster(&topic_config.cluster) .unwrap() diff --git a/src/kafka/admin.rs b/src/kafka/admin.rs index 5cbfcb51..36f11e94 100644 --- a/src/kafka/admin.rs +++ b/src/kafka/admin.rs @@ -1,31 +1,56 @@ -use anyhow::Error; +use anyhow::{Error, anyhow}; use rdkafka::ClientConfig; use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; +use rdkafka::types::RDKafkaErrorCode; use tracing::info; +/// Create the given topics on the cluster described by `admin_config` if they +/// don't already exist. `topics` pairs each topic name with its partition count. +/// +/// `admin_config` should carry only `bootstrap.servers` + auth (see +/// [`Config::kafka_admin_config`](crate::config::Config::kafka_admin_config)) so +/// the topics are created on the right cluster. A pre-existing topic +/// (`TopicAlreadyExists`) is treated as success; any other per-topic failure is +/// surfaced so a misconfigured or unreachable cluster fails loudly at startup. pub async fn create_missing_topics( - kafka_client_config: ClientConfig, - topic: &str, - default_topic_partitions: i32, + admin_config: ClientConfig, + topics: &[(&str, i32)], ) -> Result<(), Error> { - let admin_client: AdminClient<_> = kafka_client_config + if topics.is_empty() { + return Ok(()); + } + + let admin_client: AdminClient<_> = admin_config .create() - .expect("Unable to reate rdkafka admin client"); + .map_err(|e| anyhow!("Unable to create rdkafka admin client: {e}"))?; info!( - "Creating topic {:?} with {} partitions if it does not already exists", - topic, default_topic_partitions + "Creating topics {:?} if they do not already exist", + topics ); - admin_client - .create_topics( - &vec![NewTopic::new( - topic, - default_topic_partitions, - TopicReplication::Fixed(1), - )], - &AdminOptions::new(), - ) + let new_topics: Vec = topics + .iter() + .map(|(name, partitions)| NewTopic::new(name, *partitions, TopicReplication::Fixed(1))) + .collect(); + + let results = admin_client + .create_topics(&new_topics, &AdminOptions::new()) .await?; + // `create_topics` returns one result per topic; a request-level error is + // already propagated above by `?`. Tolerate topics that already exist, but + // surface every other per-topic failure (auth denied, invalid config, ...). + for result in results { + match result { + Ok(_) => {} + Err((topic, RDKafkaErrorCode::TopicAlreadyExists)) => { + info!("Topic {:?} already exists, skipping", topic); + } + Err((topic, code)) => { + return Err(anyhow!("Failed to create topic {:?}: {}", topic, code)); + } + } + } + Ok(()) } diff --git a/src/main.rs b/src/main.rs index dd3f9939..9f938ebe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::Duration; @@ -75,25 +75,18 @@ async fn main() -> Result<(), Error> { // If this is an environment where the topics might not exist, check and create them. if config.create_missing_topics { - let kafka_client_config = config.kafka_consumer_config(); - let (main_topic, _) = config - .consumable_topic() - .map_err(|e| anyhow!("invalid config: {}", e))?; - create_missing_topics( - kafka_client_config.clone(), - main_topic, - config.default_topic_partitions, - ) - .await?; - - // Create retry topic if configured - if let Some(ref retry_topic) = config.kafka_retry_topic { - create_missing_topics( - kafka_client_config, - retry_topic, - config.default_topic_partitions, - ) - .await?; + // Group every declared topic by its cluster so each is created on the + // right brokers (main, retry, deadletter and produce-only topics, which + // may live on different clusters). + let mut topics_by_cluster: BTreeMap<&str, Vec<(&str, i32)>> = BTreeMap::new(); + for (topic_name, topic_config) in &config.kafka_topics { + topics_by_cluster + .entry(topic_config.cluster.as_str()) + .or_default() + .push((topic_name.as_str(), config.default_topic_partitions)); + } + for (cluster, topics) in topics_by_cluster { + create_missing_topics(config.kafka_admin_config(cluster), &topics).await?; } } @@ -157,25 +150,30 @@ async fn main() -> Result<(), Error> { } }); - // Consumer from kafka - let consumer_task = taskbroker::tokio::spawn({ + // Consumer(s) from kafka. Each consumed topic gets its own consumer (own + // group.id and cluster), so we spawn one consumer task per consumable topic, + // all sharing the one activation store. + let consumer_topics: Vec = config + .consumable_topics() + .expect("invalid config: no consumable topic") + .into_iter() + .map(|(name, _)| name.to_owned()) + .collect(); + + let mut consumer_tasks: Vec<(String, JoinHandle>)> = Vec::new(); + for topic in consumer_topics { let consumer_store = store.clone(); let consumer_config = config.clone(); let runtime_config_manager = runtime_config_manager.clone(); + let task_topic = topic.clone(); - // Build list of topics to consume from - let (main_topic, _) = consumer_config - .consumable_topic() - .expect("invalid config: no consumable topic"); - let topics_to_consume = [main_topic.to_owned()]; - - async move { + let handle = taskbroker::tokio::spawn(async move { // The consumer has an internal thread that listens for cancellations, so it doesn't need // an outer select here like the other tasks. - let topic_refs: Vec<&str> = topics_to_consume.iter().map(|s| s.as_str()).collect(); + let topic_refs = [task_topic.as_str()]; start_consumer( &topic_refs, - &consumer_config.kafka_consumer_config(), + &consumer_config.kafka_consumer_config_for(&task_topic), consumer_store.clone(), processing_strategy!({ err: @@ -185,11 +183,11 @@ async fn main() -> Result<(), Error> { ), map: - deserialize::new(DeserializeConfig::from_config(&consumer_config)), + deserialize::new(DeserializeConfig::from_topic(&consumer_config, &task_topic)), reduce: ActivationBatcher::new( - ActivationBatcherConfig::from_config(&consumer_config), + ActivationBatcherConfig::from_topic(&consumer_config, &task_topic), runtime_config_manager.clone() ), ActivationWriter::new( @@ -200,8 +198,9 @@ async fn main() -> Result<(), Error> { }), ) .await - } - }); + }); + consumer_tasks.push((topic, handle)); + } // Status update flush task let (status_update_tx, status_update_task) = if config.batch_status_updates { @@ -337,10 +336,13 @@ async fn main() -> Result<(), Error> { .on_sigint() .on_signal(SignalKind::hangup()) .on_signal(SignalKind::quit()) - .on_completion(log_task_completion("consumer", consumer_task)) .on_completion(log_task_completion("upkeep_task", upkeep_task)) .on_completion(log_task_completion("maintenance_task", maintenance_task)); + for (topic, handle) in consumer_tasks { + departure = departure.on_completion(log_task_completion(format!("consumer:{topic}"), handle)); + } + if let Some(task) = grpc_server_task { departure = departure.on_completion(log_task_completion("grpc_server", task)); } diff --git a/src/test_utils.rs b/src/test_utils.rs index 31455a0f..0d13c4c8 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -354,12 +354,12 @@ pub fn create_producer(config: Arc) -> Arc { /// Reset a kafka topic by destroying it and recreating it. pub async fn reset_topic(config: Arc) { + let (main_topic, _) = config.consumable_topics().expect("no consumable topic")[0]; let admin_client: AdminClient<_> = config - .kafka_consumer_config() + .kafka_consumer_config_for(main_topic) .create() .expect("Could not create admin client"); - let (main_topic, _) = config.consumable_topic().expect("no consumable topic"); let options = AdminOptions::default(); admin_client .delete_topics(&[main_topic, &config.kafka_deadletter_topic], &options) @@ -385,7 +385,7 @@ pub async fn consume_topic( num_records: usize, ) -> Vec { let consumer: StreamConsumer = config - .kafka_consumer_config() + .kafka_consumer_config_for(topic) .create() .expect("could not create consumer"); consumer.subscribe(&[topic]).expect("could not subscribe"); diff --git a/src/upkeep.rs b/src/upkeep.rs index e407cd10..f4812f25 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -155,13 +155,7 @@ pub async fn do_upkeep( // 1. Handle retry tasks let handle_retries_start = Instant::now(); if let Ok(retries) = store.get_retry_activations().await { - // Use retry topic if configured, otherwise fall back to main topic - let (main_topic, _) = config.consumable_topic().expect("no consumable topic"); - let main_topic_owned = main_topic.to_owned(); - let retry_target_topic = config - .kafka_retry_topic - .as_ref() - .unwrap_or(&main_topic_owned); + let retry_target_topic = config.retry_topic().to_owned(); // 2. Append retries to kafka let deliveries = retries @@ -322,23 +316,39 @@ pub async fn do_upkeep( // 12. Forward tasks from demoted namespaces to `runtime_config.demoted_topic` let demoted_namespaces = runtime_config.demoted_namespaces.clone(); - let (main_topic, main_topic_config) = config.consumable_topic().expect("no consumable topic"); - let main_cluster = config - .cluster(&main_topic_config.cluster) - .expect("cluster not found") - .address - .clone(); + let consumable = config.consumable_topics().expect("no consumable topic"); + // Default the forward cluster to the consumed cluster when there's exactly + // one consumed topic (legacy behavior). With multiple consumed topics there + // is no single consumed cluster, so fall back to the producer (deadletter) + // cluster, whose credentials the forward producer reuses anyway. + let default_forward_cluster = if consumable.len() == 1 { + config + .cluster(&consumable[0].1.cluster) + .expect("cluster not found") + .address + .clone() + } else { + config.kafka_producer_cluster().address.clone() + }; let forward_cluster = runtime_config .demoted_topic_cluster .clone() - .unwrap_or(main_cluster.clone()); + .unwrap_or(default_forward_cluster); let forward_topic = runtime_config .demoted_topic .clone() .unwrap_or(config.kafka_long_topic.clone()); - let same_cluster = forward_cluster == main_cluster; - let same_topic = forward_topic == main_topic; - if !(demoted_namespaces.is_empty() || (same_cluster && same_topic)) { + // Forwarding to a topic+cluster this broker itself consumes would loop the + // tasks straight back in, so treat that as a no-op. For a single consumed + // topic this is exactly the old `same_topic && same_cluster` guard. + let forwards_to_consumed = consumable.iter().any(|(name, topic_config)| { + *name == forward_topic + && config + .cluster(&topic_config.cluster) + .map(|c| c.address == forward_cluster) + .unwrap_or(false) + }); + if !(demoted_namespaces.is_empty() || forwards_to_consumed) { let forward_demoted_start = Instant::now(); // The forwarding producer reuses the deadletter cluster's credentials // (see Config::kafka_producer_config) and only overrides @@ -765,7 +775,7 @@ mod tests { assert_eq!(store.count().await.unwrap(), 1); assert_eq!(result_context.retried, 1); - let (main_topic, _) = config.consumable_topic().unwrap(); + let (main_topic, _) = config.consumable_topics().unwrap()[0]; let messages = consume_topic(config.clone(), main_topic, 1).await; assert_eq!(messages.len(), 1); let activation = &messages[0]; From 5a9b77ca4bb8dec03714f99eb68defe9684d8c85 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 3 Jun 2026 12:33:22 +0200 Subject: [PATCH 03/10] add test --- Makefile | 5 + .../integration_tests/test_multi_topic.py | 134 ++++++++++++++++++ 2 files changed, 139 insertions(+) create mode 100644 integration_tests/integration_tests/test_multi_topic.py diff --git a/Makefile b/Makefile index ee63db0f..f912d4d3 100644 --- a/Makefile +++ b/Makefile @@ -69,6 +69,11 @@ test-upkeep-retry: build reset-kafka ## Run the upkeep retry integration test rm -r integration_tests/.tests_output/test_upkeep_retry .PHONY: test-upkeep-retry +test-multi-topic: build reset-kafka ## Run the multi-topic consumption integration test + python -m pytest integration_tests/integration_tests/test_multi_topic.py -s + rm -r integration_tests/.tests_output/test_multi_topic +.PHONY: test-multi-topic + test-upkeep-expiry: build reset-kafka ## Run the upkeep expiry integration test python -m pytest integration_tests/integration_tests/test_upkeep_expiry.py -s rm -r integration_tests/.tests_output/test_upkeep_expiry diff --git a/integration_tests/integration_tests/test_multi_topic.py b/integration_tests/integration_tests/test_multi_topic.py new file mode 100644 index 00000000..7b0e21e8 --- /dev/null +++ b/integration_tests/integration_tests/test_multi_topic.py @@ -0,0 +1,134 @@ +import signal +import subprocess +import time + +import pytest +import yaml + +from integration_tests.helpers import ( + TASKBROKER_BIN, + TESTS_OUTPUT_ROOT, + TaskbrokerConfig, + create_topic, + get_available_ports, + get_num_tasks_in_sqlite, + send_generic_messages_to_topic, +) + +TEST_OUTPUT_PATH = TESTS_OUTPUT_ROOT / "test_multi_topic" + + +def test_multi_topic_consumption() -> None: + """ + Verify that a single taskbroker configured (new `kafka_topics` format) with + two consumable topics consumes from BOTH into its one sqlite store. + + Each consumed topic gets its own rdkafka consumer (own group.id); both + pipelines write to the shared store. We produce N messages to each of two + topics and assert the broker ends up with 2*N tasks in sqlite. + """ + num_messages_per_topic = 1_000 + num_partitions = 4 + timeout = 60 + curr_time = int(time.time()) + + topic_a = f"multitopic-a-{curr_time}" + topic_b = f"multitopic-b-{curr_time}" + retry_topic = f"multitopic-retry-{curr_time}" + dlq_topic = f"multitopic-dlq-{curr_time}" + + # Pre-create the topics so the test exercises consumption, not topic + # creation. (create_missing_topics stays off.) + for topic in (topic_a, topic_b, retry_topic, dlq_topic): + create_topic(topic, num_partitions) + + TEST_OUTPUT_PATH.mkdir(parents=True, exist_ok=True) + db_name = f"db_multi_topic_{curr_time}" + db_path = str(TEST_OUTPUT_PATH / f"{db_name}.sqlite") + config_filename = f"config_multi_topic_{curr_time}.yml" + grpc_port = get_available_ports(1)[0] + + # New multi-topic config: two consumable topics + produce-only retry/dlq, + # all on a single cluster. + config_dict = { + "db_name": db_name, + "db_path": db_path, + "max_pending_count": 100_000, + "grpc_port": grpc_port, + "kafka_auto_offset_reset": "earliest", + "kafka_deadletter_topic": dlq_topic, + "kafka_retry_topic": retry_topic, + "kafka_clusters": { + "default": {"address": "127.0.0.1:9092"}, + }, + "kafka_topics": { + topic_a: {"cluster": "default", "consumer_group": f"{topic_a}-grp"}, + topic_b: {"cluster": "default", "consumer_group": f"{topic_b}-grp"}, + retry_topic: { + "cluster": "default", + "consumer_group": f"{retry_topic}-grp", + "produce_only": True, + }, + dlq_topic: { + "cluster": "default", + "consumer_group": f"{dlq_topic}-grp", + "produce_only": True, + }, + }, + } + + config_path = str(TEST_OUTPUT_PATH / config_filename) + with open(config_path, "w") as f: + yaml.safe_dump(config_dict, f) + + # A TaskbrokerConfig instance is only needed for the sqlite-counting helper, + # which reads db_name/db_path. + query_config = TaskbrokerConfig( + db_name=db_name, + db_path=db_path, + max_pending_count=100_000, + kafka_topic=topic_a, + kafka_deadletter_topic=dlq_topic, + kafka_consumer_group=f"{topic_a}-grp", + kafka_auto_offset_reset="earliest", + grpc_port=grpc_port, + ) + + log_path = str(TEST_OUTPUT_PATH / f"taskbroker_multi_topic_{curr_time}.log") + expected_total = num_messages_per_topic * 2 + + send_generic_messages_to_topic(topic_a, num_messages_per_topic) + send_generic_messages_to_topic(topic_b, num_messages_per_topic) + + process = None + try: + with open(log_path, "a") as log_file: + process = subprocess.Popen( + [str(TASKBROKER_BIN), "-c", config_path], + stderr=subprocess.STDOUT, + stdout=log_file, + ) + time.sleep(3) # give the broker time to start both consumers + + written = 0 + end = time.time() + timeout + while time.time() < end: + written = get_num_tasks_in_sqlite(query_config) + if written >= expected_total: + break + # the broker should still be alive while consuming + assert process.poll() is None, "taskbroker exited early" + time.sleep(1) + + assert written == expected_total, ( + f"expected {expected_total} tasks in sqlite " + f"({num_messages_per_topic} from each of two topics), got {written}" + ) + finally: + if process is not None: + process.send_signal(signal.SIGINT) + try: + assert process.wait(timeout=10) == 0 + except Exception: + process.kill() + raise From 8d8975e2109e3ececedd04dc87cb85e24c85aae2 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 3 Jun 2026 12:35:07 +0200 Subject: [PATCH 04/10] add test to ci --- .github/workflows/ci.yml | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a34650f8..ad8520bc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -463,3 +463,38 @@ jobs: - name: Run failed tasks integration test run: | make test-failed-tasks + + multi-topic-integration-test: + name: Multi-topic integration test + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2 + + - name: Install cmake + uses: lukka/get-cmake@28983e0d3955dba2bb0a6810caae0c6cf268ec0c # latest + + - uses: actions-rs/toolchain@16499b5e05bf2e26879000db0c1d13f7e13fa3af # pin@v1 + with: + toolchain: stable + profile: minimal + override: true + + - uses: swatinem/rust-cache@81d053bdb0871dcd3f10763c8cc60d0adc41762b # pin@v1 + with: + key: ${{ github.job }} + + - uses: astral-sh/setup-uv@5a7eac68fb9809dea845d802897dc5c723910fa3 # v7.1.3 + with: + version: '0.8.2' + # we just cache the venv-dir directly in action-setup-venv + enable-cache: false + + - uses: getsentry/action-setup-venv@5a80476d175edf56cb205b08bc58986fa99d1725 # v3.2.0 + with: + cache-dependency-path: uv.lock + install-cmd: uv sync --frozen --only-dev --active + + - name: Run multi-topic integration test + run: | + make test-multi-topic From bdb398bf5202261a481756d6edcb658b298c98a8 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 3 Jun 2026 12:41:16 +0200 Subject: [PATCH 05/10] add deprecation warning for raw mode too --- src/config.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/config.rs b/src/config.rs index aa26df2d..8969f3b0 100644 --- a/src/config.rs +++ b/src/config.rs @@ -710,6 +710,9 @@ impl Config { kafka_topics with a cluster reference instead" ); } + if self.raw_mode { + warn!("raw_mode is deprecated, use kafka_topics..raw instead"); + } let topic_name = self .kafka_topic From 58af65997f68bc8c5ee6456359a235317503cc88 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 3 Jun 2026 12:51:34 +0200 Subject: [PATCH 06/10] fix lints --- integration_tests/integration_tests/test_multi_topic.py | 1 - src/kafka/admin.rs | 5 +---- src/main.rs | 3 ++- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/integration_tests/integration_tests/test_multi_topic.py b/integration_tests/integration_tests/test_multi_topic.py index 7b0e21e8..1706b68b 100644 --- a/integration_tests/integration_tests/test_multi_topic.py +++ b/integration_tests/integration_tests/test_multi_topic.py @@ -2,7 +2,6 @@ import subprocess import time -import pytest import yaml from integration_tests.helpers import ( diff --git a/src/kafka/admin.rs b/src/kafka/admin.rs index 36f11e94..61cfee57 100644 --- a/src/kafka/admin.rs +++ b/src/kafka/admin.rs @@ -24,10 +24,7 @@ pub async fn create_missing_topics( .create() .map_err(|e| anyhow!("Unable to create rdkafka admin client: {e}"))?; - info!( - "Creating topics {:?} if they do not already exist", - topics - ); + info!("Creating topics {:?} if they do not already exist", topics); let new_topics: Vec = topics .iter() .map(|(name, partitions)| NewTopic::new(name, *partitions, TopicReplication::Fixed(1))) diff --git a/src/main.rs b/src/main.rs index 9f938ebe..ec56cb1f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -340,7 +340,8 @@ async fn main() -> Result<(), Error> { .on_completion(log_task_completion("maintenance_task", maintenance_task)); for (topic, handle) in consumer_tasks { - departure = departure.on_completion(log_task_completion(format!("consumer:{topic}"), handle)); + departure = + departure.on_completion(log_task_completion(format!("consumer:{topic}"), handle)); } if let Some(task) = grpc_server_task { From 63a26daa267ef7fd6583467cb6dba76a44c6a8b6 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 3 Jun 2026 16:34:06 +0200 Subject: [PATCH 07/10] feat(metrics): Tag consumer/pipeline metrics with topic With one consumer per topic, the consumer/pipeline metrics raced (gauges) or merged across topics (counters/histograms). Add a `topic` tag to the rebalance gauges/counters, the activation writer and batcher metrics, and the deserialize payload-size histograms. Store-level metrics are left untagged. Also demote sqlite's per-consumer assign_partitions warn to debug. ref STREAM-1042 Co-Authored-By: Claude Opus 4.8 --- src/kafka/activation_batcher.rs | 31 ++++++++++++++------ src/kafka/activation_writer.rs | 44 +++++++++++++++++++++++------ src/kafka/consumer.rs | 39 +++++++++++++++++-------- src/kafka/deserialize_activation.rs | 1 + src/kafka/deserialize_raw.rs | 1 + src/main.rs | 2 +- src/store/adapters/sqlite.rs | 6 ++-- 7 files changed, 94 insertions(+), 30 deletions(-) diff --git a/src/kafka/activation_batcher.rs b/src/kafka/activation_batcher.rs index f5748f5a..8a92c1d9 100644 --- a/src/kafka/activation_batcher.rs +++ b/src/kafka/activation_batcher.rs @@ -107,15 +107,23 @@ impl Reducer for ActivationBatcher { let namespace = &t.namespace; if runtime_config.drop_task_killswitch.contains(task_name) { - metrics::counter!("filter.drop_task_killswitch", "taskname" => task_name.clone()) - .increment(1); + metrics::counter!( + "filter.drop_task_killswitch", + "topic" => self.config.kafka_topic.clone(), + "taskname" => task_name.clone(), + ) + .increment(1); return Ok(()); } if let Some(expires_at) = t.expires_at && Utc::now() > expires_at { - metrics::counter!("filter.expired_at_consumer").increment(1); + metrics::counter!( + "filter.expired_at_consumer", + "topic" => self.config.kafka_topic.clone(), + ) + .increment(1); return Ok(()); } @@ -123,6 +131,7 @@ impl Reducer for ActivationBatcher { if forward_topic == self.config.kafka_topic { metrics::counter!( "filter.forward_task_demoted_namespace.skipped", + "topic" => self.config.kafka_topic.clone(), "namespace" => namespace.clone(), "taskname" => task_name.clone(), ) @@ -130,6 +139,7 @@ impl Reducer for ActivationBatcher { } else { metrics::counter!( "filter.forward_task_demoted_namespace", + "topic" => self.config.kafka_topic.clone(), "namespace" => namespace.clone(), "taskname" => task_name.clone(), ) @@ -150,8 +160,10 @@ impl Reducer for ActivationBatcher { return Ok(None); } - metrics::histogram!("consumer.batch_rows").record(self.batch.len() as f64); - metrics::histogram!("consumer.batch_bytes").record(self.batch_size as f64); + metrics::histogram!("consumer.batch_rows", "topic" => self.config.kafka_topic.clone()) + .record(self.batch.len() as f64); + metrics::histogram!("consumer.batch_bytes", "topic" => self.config.kafka_topic.clone()) + .record(self.batch_size as f64); // Send all forward batch in parallel if !self.forward_batch.is_empty() { @@ -184,9 +196,12 @@ impl Reducer for ActivationBatcher { let results = join_all(sends).await; let success_count = results.iter().filter(|r| r.is_ok()).count(); - metrics::histogram!("consumer.forward_attempts").record(results.len() as f64); - metrics::histogram!("consumer.forward_successes").record(success_count as f64); - metrics::histogram!("consumer.forward_failures") + let topic = self.config.kafka_topic.clone(); + metrics::histogram!("consumer.forward_attempts", "topic" => topic.clone()) + .record(results.len() as f64); + metrics::histogram!("consumer.forward_successes", "topic" => topic.clone()) + .record(success_count as f64); + metrics::histogram!("consumer.forward_failures", "topic" => topic) .record((results.len() - success_count) as f64); self.forward_batch.clear(); diff --git a/src/kafka/activation_writer.rs b/src/kafka/activation_writer.rs index 4f8c4589..2f024910 100644 --- a/src/kafka/activation_writer.rs +++ b/src/kafka/activation_writer.rs @@ -16,6 +16,9 @@ use super::consumer::{ }; pub struct ActivationWriterConfig { + /// The consumed topic this writer belongs to, used as a metric tag. Each + /// consumer has its own writer, so writer metrics are per-topic. + pub topic: String, pub max_buf_len: usize, pub max_pending_activations: usize, pub max_processing_activations: usize, @@ -25,9 +28,11 @@ pub struct ActivationWriterConfig { } impl ActivationWriterConfig { - /// Convert from application configuration into ActivationWriter config. - pub fn from_config(config: &Config) -> Self { + /// Convert from application configuration into ActivationWriter config for a + /// single consumed topic. + pub fn from_topic(config: &Config, topic: &str) -> Self { Self { + topic: topic.to_owned(), db_max_size: config.db_max_size, max_buf_len: config.db_insert_batch_max_len, max_pending_activations: config.max_pending_count, @@ -133,6 +138,7 @@ impl Reducer for ActivationWriter { }; metrics::counter!( "consumer.inflight_activation_writer.backpressure", + "topic" => self.config.topic.clone(), "reason" => reason, ) .increment(1); @@ -160,11 +166,21 @@ impl Reducer for ActivationWriter { .min_by_key(|item| item.timestamp()) .unwrap(); - metrics::histogram!("consumer.inflight_activation_writer.write_to_store") - .record(write_to_store_start.elapsed()); - metrics::histogram!("consumer.inflight_activation_writer.insert_lag") - .record(lag.num_seconds() as f64); - metrics::counter!("consumer.inflight_activation_writer.stored").increment(entries); + metrics::histogram!( + "consumer.inflight_activation_writer.write_to_store", + "topic" => self.config.topic.clone(), + ) + .record(write_to_store_start.elapsed()); + metrics::histogram!( + "consumer.inflight_activation_writer.insert_lag", + "topic" => self.config.topic.clone(), + ) + .record(lag.num_seconds() as f64); + metrics::counter!( + "consumer.inflight_activation_writer.stored", + "topic" => self.config.topic.clone(), + ) + .increment(entries); debug!( "Inserted {:?} entries with max lag: {:?}s", entries, @@ -174,7 +190,11 @@ impl Reducer for ActivationWriter { } Err(err) => { error!("Unable to write to sqlite: {}", err); - metrics::counter!("consumer.inflight_activation_writer.write_failed").increment(1); + metrics::counter!( + "consumer.inflight_activation_writer.write_failed", + "topic" => self.config.topic.clone(), + ) + .increment(1); sleep(Duration::from_millis(self.config.write_failure_backoff_ms)).await; Ok(None) } @@ -216,6 +236,7 @@ mod tests { async fn test_writer_flush_batch(#[case] adapter: &str) { let store = create_test_store(adapter).await; let writer_config = ActivationWriterConfig { + topic: "test-topic".to_string(), db_max_size: None, max_buf_len: 100, max_pending_activations: 10, @@ -262,6 +283,7 @@ mod tests { async fn test_writer_flush_only_pending(#[case] adapter: &str) { let store = create_test_store(adapter).await; let writer_config = ActivationWriterConfig { + topic: "test-topic".to_string(), db_max_size: None, max_buf_len: 100, max_pending_activations: 10, @@ -297,6 +319,7 @@ mod tests { async fn test_writer_flush_only_delay(#[case] adapter: &str) { let store = create_test_store(adapter).await; let writer_config = ActivationWriterConfig { + topic: "test-topic".to_string(), db_max_size: None, max_buf_len: 100, max_pending_activations: 0, @@ -337,6 +360,7 @@ mod tests { async fn test_writer_backpressure_pending_limit_reached(#[case] adapter: &str) { let store = create_test_store(adapter).await; let writer_config = ActivationWriterConfig { + topic: "test-topic".to_string(), db_max_size: None, max_buf_len: 100, max_pending_activations: 0, @@ -386,6 +410,7 @@ mod tests { ) { let store = create_test_store(adapter).await; let writer_config = ActivationWriterConfig { + topic: "test-topic".to_string(), db_max_size: None, max_buf_len: 100, max_pending_activations: 10, @@ -433,6 +458,7 @@ mod tests { async fn test_writer_backpressure_processing_limit_reached(#[case] adapter: &str) { let store = create_test_store(adapter).await; let writer_config = ActivationWriterConfig { + topic: "test-topic".to_string(), db_max_size: None, max_buf_len: 100, max_pending_activations: 10, @@ -502,6 +528,7 @@ mod tests { async fn test_writer_backpressure_db_size_limit_reached(#[case] adapter: &str) { let store = create_test_store(adapter).await; let writer_config = ActivationWriterConfig { + topic: "test-topic".to_string(), // 200 rows is ~50KB db_max_size: Some(50_000), max_buf_len: 100, @@ -534,6 +561,7 @@ mod tests { async fn test_writer_flush_empty_batch(#[case] adapter: &str) { let store = create_test_store(adapter).await; let writer_config = ActivationWriterConfig { + topic: "test-topic".to_string(), db_max_size: None, max_buf_len: 100, max_pending_activations: 10, diff --git a/src/kafka/consumer.rs b/src/kafka/consumer.rs index aee767b1..3d445a37 100644 --- a/src/kafka/consumer.rs +++ b/src/kafka/consumer.rs @@ -41,7 +41,10 @@ pub async fn start_consumer( ) -> Result<(), Error> { let (client_shutdown_sender, client_shutdown_receiver) = oneshot::channel(); let (event_sender, event_receiver) = unbounded_channel(); - let context = KafkaContext::new(event_sender.clone()); + // Each consumer subscribes to a single topic; join defensively in case that + // ever changes. Used as the `topic` tag on this consumer's metrics. + let topic = topics.join(","); + let context = KafkaContext::new(event_sender.clone(), topic.clone()); let consumer: Arc> = Arc::new( kafka_client_config .create_with_context(context) @@ -54,13 +57,14 @@ pub async fn start_consumer( handle_shutdown_signals(event_sender.clone()); poll_consumer_client(consumer.clone(), client_shutdown_receiver); - metrics::gauge!("arroyo.consumer.current_partitions").set(0); + metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topic.clone()).set(0); handle_events( consumer, event_receiver, activation_store, client_shutdown_sender, spawn_actors, + topic, ) .await } @@ -109,11 +113,16 @@ pub fn poll_consumer_client( #[derive(Debug)] pub struct KafkaContext { event_sender: UnboundedSender<(Event, SyncSender<()>)>, + /// The topic(s) this consumer is subscribed to, used as a metric tag. + topic: String, } impl KafkaContext { - pub fn new(event_sender: UnboundedSender<(Event, SyncSender<()>)>) -> Self { - Self { event_sender } + pub fn new(event_sender: UnboundedSender<(Event, SyncSender<()>)>, topic: String) -> Self { + Self { + event_sender, + topic, + } } } @@ -140,8 +149,11 @@ impl ConsumerContext for KafkaContext { info!("Partition assignment event sent, waiting for rendezvous..."); let _ = rendezvous_receiver.recv(); info!("Rendezvous complete"); - metrics::counter!("arroyo.consumer.partitions_assigned.count") - .increment(tpl.count() as u64); + metrics::counter!( + "arroyo.consumer.partitions_assigned.count", + "topic" => self.topic.clone(), + ) + .increment(tpl.count() as u64); } Rebalance::Revoke(tpl) => { debug!("Got pre-rebalance callback, kind: Revoke"); @@ -159,8 +171,11 @@ impl ConsumerContext for KafkaContext { info!("Partition revocation event sent, waiting for rendezvous..."); let _ = rendezvous_receiver.recv(); info!("Rendezvous complete"); - metrics::counter!("arroyo.consumer.partitions_revoked.count") - .increment(tpl.count() as u64); + metrics::counter!( + "arroyo.consumer.partitions_revoked.count", + "topic" => self.topic.clone(), + ) + .increment(tpl.count() as u64); } Rebalance::Error(err) => { debug!("Got pre-rebalance callback, kind: Error"); @@ -337,6 +352,7 @@ pub async fn handle_events( Arc>, &BTreeSet<(String, i32)>, ) -> ActorHandles, + topic: String, ) -> Result<(), anyhow::Error> { const CALLBACK_DURATION: Duration = Duration::from_secs(4); @@ -363,7 +379,8 @@ pub async fn handle_events( info!("Received event: {:?}", event); state = match (state, event) { (ConsumerState::Ready, Event::Assign(tpl)) => { - metrics::gauge!("arroyo.consumer.current_partitions").set(tpl.len() as f64); + metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topic.clone()) + .set(tpl.len() as f64); // Note: This assumes we only process one topic per consumer. let mut partitions = Vec::::new(); for (_, partition) in tpl.iter() { @@ -386,7 +403,7 @@ pub async fn handle_events( ); activation_store.assign_partitions(vec![]).unwrap(); handles.shutdown(CALLBACK_DURATION).await; - metrics::gauge!("arroyo.consumer.current_partitions").set(0); + metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topic.clone()).set(0); ConsumerState::Ready } (ConsumerState::Consuming(handles, _), Event::Shutdown) => { @@ -394,7 +411,7 @@ pub async fn handle_events( handles.shutdown(CALLBACK_DURATION).await; debug!("Signaling shutdown to client..."); shutdown_client.take(); - metrics::gauge!("arroyo.consumer.current_partitions").set(0); + metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topic.clone()).set(0); ConsumerState::Stopped } (ConsumerState::Stopped, _) => { diff --git a/src/kafka/deserialize_activation.rs b/src/kafka/deserialize_activation.rs index 332c727e..9ab1d11e 100644 --- a/src/kafka/deserialize_activation.rs +++ b/src/kafka/deserialize_activation.rs @@ -48,6 +48,7 @@ pub fn new( metrics::histogram!( "consumer.message.payload_size_bytes", + "topic" => msg.topic().to_string(), "namespace" => namespace.clone(), "taskname" => taskname.clone() ) diff --git a/src/kafka/deserialize_raw.rs b/src/kafka/deserialize_raw.rs index 34f1b505..40c53a02 100644 --- a/src/kafka/deserialize_raw.rs +++ b/src/kafka/deserialize_raw.rs @@ -143,6 +143,7 @@ pub fn new(config: RawConfig) -> impl Fn(Arc) -> Result msg.topic().to_string(), "namespace" => config.namespace.clone(), "taskname" => config.taskname.clone() ) diff --git a/src/main.rs b/src/main.rs index ec56cb1f..93f971d1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -192,7 +192,7 @@ async fn main() -> Result<(), Error> { ), ActivationWriter::new( consumer_store.clone(), - ActivationWriterConfig::from_config(&consumer_config) + ActivationWriterConfig::from_topic(&consumer_config, &task_topic) ), }), diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index 613abd8d..4bc53971 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -22,7 +22,7 @@ use libsqlite3_sys::{ }; use prost::Message; use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, TaskActivation}; -use tracing::{instrument, warn}; +use tracing::{debug, instrument, warn}; use crate::config::Config; use crate::store::activation::{Activation, ActivationStatus}; @@ -436,7 +436,9 @@ impl ActivationStore for SqliteStore { } fn assign_partitions(&self, partitions: Vec) -> Result<(), Error> { - warn!("assign_partitions: {:?}", partitions); + // sqlite owns its whole DB regardless of partition assignment, so this + // is a no-op. Fires once per consumer, hence debug rather than warn. + debug!("assign_partitions: {:?}", partitions); Ok(()) } From 21e1f4a65312887cdd9b762833c0649de32cdfe4 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 3 Jun 2026 16:34:06 +0200 Subject: [PATCH 08/10] fix(config): Validate raw-mode topic fields at config time A topic with raw mode but missing namespace/application/taskname/ processing_deadline_duration passed config validation and only panicked later when the consumer built its deserializer (via .expect()). Validate completeness (and that the application is in worker_map, and the retry topic differs from the raw topic) in normalize_and_validate so it's a clean config error instead. ref STREAM-1042 Co-Authored-By: Claude Opus 4.8 --- src/config.rs | 106 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 103 insertions(+), 3 deletions(-) diff --git a/src/config.rs b/src/config.rs index 8969f3b0..b1d46ca1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -946,6 +946,47 @@ impl Config { )); } + // Validate raw-mode topics up front. The raw fields are optional on + // `RawModeConfig` (they don't apply to non-raw topics), but a raw topic + // requires all of them. Catch missing fields here as a config error + // rather than panicking when the consumer builds its deserializer. + for (topic_name, topic_config) in &self.kafka_topics { + let Some(raw) = &topic_config.raw else { + continue; + }; + let application = raw.application.as_deref().ok_or_else(|| { + anyhow!("topic '{topic_name}' enables raw mode but is missing raw.application") + })?; + if !self.worker_map.contains_key(application) { + return Err(anyhow!( + "topic '{topic_name}' raw.application '{application}' is not in worker_map" + )); + } + if raw.namespace.is_none() { + return Err(anyhow!( + "topic '{topic_name}' enables raw mode but is missing raw.namespace" + )); + } + if raw.taskname.is_none() { + return Err(anyhow!( + "topic '{topic_name}' enables raw mode but is missing raw.taskname" + )); + } + if raw.processing_deadline_duration.is_none() { + return Err(anyhow!( + "topic '{topic_name}' enables raw mode but is missing \ + raw.processing_deadline_duration" + )); + } + // Raw messages aren't activations, so retries must go to a separate + // activation-encoded topic, never back to the raw topic itself. + if self.kafka_retry_topic.as_deref() == Some(topic_name.as_str()) { + return Err(anyhow!( + "kafka_retry_topic must differ from raw topic '{topic_name}'" + )); + } + } + Ok(()) } @@ -1578,12 +1619,18 @@ mod tests { kafka_deadletter_topic: profiles-dlq kafka_retry_topic: profiles-retry +worker_map: + profiles: http://worker-profiles:50052 + kafka_topics: profiles: cluster: profiles-cluster consumer_group: taskbroker-profiles raw: + namespace: profiles application: profiles + taskname: profiles.process + processing_deadline_duration: 30 profiles-retry: cluster: profiles-cluster consumer_group: taskbroker-profiles-retry @@ -1614,10 +1661,10 @@ kafka_clusters: assert_eq!( profiles.raw, Some(RawModeConfig { - namespace: None, + namespace: Some("profiles".to_owned()), application: Some("profiles".to_owned()), - taskname: None, - processing_deadline_duration: None, + taskname: Some("profiles.process".to_owned()), + processing_deadline_duration: Some(30), }) ); @@ -1655,6 +1702,59 @@ kafka_clusters: }); } + /// A raw topic missing a required field must be rejected at config time with + /// a clear error, not panic later when the consumer builds its deserializer. + #[test] + fn test_raw_mode_missing_field_rejected_cleanly() { + Jail::expect_with(|jail| { + // raw is missing `namespace` (application/taskname/deadline present). + jail.create_file( + "config.yaml", + r#" +kafka_deadletter_topic: profiles-dlq +kafka_retry_topic: profiles-retry + +worker_map: + profiles: http://worker-profiles:50052 + +kafka_topics: + profiles: + cluster: profiles-cluster + consumer_group: taskbroker-profiles + raw: + application: profiles + taskname: profiles.process + processing_deadline_duration: 30 + profiles-retry: + cluster: profiles-cluster + consumer_group: taskbroker-profiles-retry + produce_only: true + profiles-dlq: + cluster: profiles-cluster + consumer_group: taskbroker-profiles-dlq + produce_only: true + +kafka_clusters: + profiles-cluster: + address: 10.0.0.1:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + // A clean error, not a panic. + let err = Config::from_args(&args).unwrap_err(); + assert!( + err.to_string().contains("raw.namespace"), + "expected a clean missing-field error, got: {}", + err + ); + + Ok(()) + }); + } + #[test] fn test_multi_topic_config_from_env() { Jail::expect_with(|jail| { From 6a9f779c92bb81ac1064d653dff48d056aaf5757 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 3 Jun 2026 16:45:35 +0200 Subject: [PATCH 09/10] fix(upkeep): Default demoted forwarding to the deadletter cluster The forwarding producer authenticates against the deadletter cluster and only overrides bootstrap.servers, so the deadletter cluster is the only target where its credentials reliably work. The consumer-side batcher and the upkeep path disagreed on the default forward cluster when demoted_topic_cluster is unset: the batcher used each topic's own cluster while upkeep used the deadletter cluster for multi-topic. Since the demoted topic is a single global topic, default both to the deadletter cluster. Unchanged for legacy single-topic, where the deadletter cluster address defaults to the consumed cluster's address. ref STREAM-1042 Co-Authored-By: Claude Opus 4.8 --- src/kafka/activation_batcher.rs | 28 ++++++++++++++-------------- src/upkeep.rs | 20 ++++++-------------- 2 files changed, 20 insertions(+), 28 deletions(-) diff --git a/src/kafka/activation_batcher.rs b/src/kafka/activation_batcher.rs index 8a92c1d9..9e608c5f 100644 --- a/src/kafka/activation_batcher.rs +++ b/src/kafka/activation_batcher.rs @@ -19,7 +19,6 @@ use super::consumer::{ pub struct ActivationBatcherConfig { pub producer_config: ClientConfig, - pub kafka_cluster: String, pub kafka_topic: String, pub kafka_long_topic: String, pub send_timeout_ms: u64, @@ -33,17 +32,8 @@ impl ActivationBatcherConfig { /// single consumed topic. Each consumer has its own batcher, so the topic is /// passed explicitly rather than derived from "the" consumable topic. pub fn from_topic(config: &Config, topic_name: &str) -> Self { - let topic_config = config - .kafka_topics - .get(topic_name) - .unwrap_or_else(|| panic!("unknown topic '{topic_name}'")); - let cluster = config - .cluster(&topic_config.cluster) - .expect("cluster not found"); - Self { producer_config: config.kafka_producer_config(), - kafka_cluster: cluster.address.clone(), kafka_topic: topic_name.to_owned(), kafka_long_topic: config.kafka_long_topic.clone(), send_timeout_ms: config.kafka_send_timeout_ms, @@ -168,10 +158,20 @@ impl Reducer for ActivationBatcher { // Send all forward batch in parallel if !self.forward_batch.is_empty() { let runtime_config = self.runtime_config_manager.read().await; - let forward_cluster = runtime_config - .demoted_topic_cluster - .clone() - .unwrap_or(self.config.kafka_cluster.clone()); + // The forwarding producer authenticates against the deadletter + // cluster, so default demoted forwarding there too (and consistently + // with upkeep) when no demoted_topic_cluster is configured. + let forward_cluster = + runtime_config + .demoted_topic_cluster + .clone() + .unwrap_or_else(|| { + self.config + .producer_config + .get("bootstrap.servers") + .expect("producer config always sets bootstrap.servers") + .to_string() + }); if self.producer_cluster != forward_cluster { let mut new_config = self.config.producer_config.clone(); new_config.set("bootstrap.servers", &forward_cluster); diff --git a/src/upkeep.rs b/src/upkeep.rs index f4812f25..cad5e785 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -317,23 +317,15 @@ pub async fn do_upkeep( // 12. Forward tasks from demoted namespaces to `runtime_config.demoted_topic` let demoted_namespaces = runtime_config.demoted_namespaces.clone(); let consumable = config.consumable_topics().expect("no consumable topic"); - // Default the forward cluster to the consumed cluster when there's exactly - // one consumed topic (legacy behavior). With multiple consumed topics there - // is no single consumed cluster, so fall back to the producer (deadletter) - // cluster, whose credentials the forward producer reuses anyway. - let default_forward_cluster = if consumable.len() == 1 { - config - .cluster(&consumable[0].1.cluster) - .expect("cluster not found") - .address - .clone() - } else { - config.kafka_producer_cluster().address.clone() - }; + // The forward producer authenticates against the deadletter cluster, so + // default demoted forwarding there too (and consistently with the + // consumer-side batcher) when no demoted_topic_cluster is configured. For + // legacy single-topic configs the deadletter cluster address defaults to the + // consumed cluster's address, so this is unchanged there. let forward_cluster = runtime_config .demoted_topic_cluster .clone() - .unwrap_or(default_forward_cluster); + .unwrap_or_else(|| config.kafka_producer_cluster().address.clone()); let forward_topic = runtime_config .demoted_topic .clone() From 27241309cb5d4c5934945d1acbb55246437f6fbd Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 3 Jun 2026 16:51:30 +0200 Subject: [PATCH 10/10] docs: Clarify demoted forward-cluster default Follow-up to the forward-cluster fix: the upkeep comment overstated that legacy behavior is "unchanged" (it only is when kafka_deadletter_cluster is unset), and the demoted_topic_cluster doc still claimed it defaults to the consumed cluster. Correct both to say it defaults to the deadletter cluster. ref STREAM-1042 Co-Authored-By: Claude Opus 4.8 --- src/runtime_config.rs | 3 ++- src/upkeep.rs | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/runtime_config.rs b/src/runtime_config.rs index 344a0e07..83dae1e7 100644 --- a/src/runtime_config.rs +++ b/src/runtime_config.rs @@ -16,7 +16,8 @@ pub struct RuntimeConfig { /// to prevent them from blocking other tasks in shared namespaces. pub demoted_namespaces: Vec, /// The cluster to forward tasks from demoted namespaces to. - /// If not set, the current cluster taskbroker is consuming from will be used. + /// If not set, the deadletter cluster is used (the forwarding producer + /// authenticates against it, so it is the only reliably reachable default). pub demoted_topic_cluster: Option, /// The topic to forward tasks from demoted namespaces to. /// If not set, the taskworker-long topic will be used diff --git a/src/upkeep.rs b/src/upkeep.rs index cad5e785..d2a069f3 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -320,8 +320,9 @@ pub async fn do_upkeep( // The forward producer authenticates against the deadletter cluster, so // default demoted forwarding there too (and consistently with the // consumer-side batcher) when no demoted_topic_cluster is configured. For - // legacy single-topic configs the deadletter cluster address defaults to the - // consumed cluster's address, so this is unchanged there. + // legacy configs where kafka_deadletter_cluster is unset the deadletter + // cluster address defaults to the consumed cluster's address, so this is + // unchanged there; it only differs when a distinct deadletter cluster is set. let forward_cluster = runtime_config .demoted_topic_cluster .clone()