diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a34650f8..ad8520bc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -463,3 +463,38 @@ jobs: - name: Run failed tasks integration test run: | make test-failed-tasks + + multi-topic-integration-test: + name: Multi-topic integration test + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2 + + - name: Install cmake + uses: lukka/get-cmake@28983e0d3955dba2bb0a6810caae0c6cf268ec0c # latest + + - uses: actions-rs/toolchain@16499b5e05bf2e26879000db0c1d13f7e13fa3af # pin@v1 + with: + toolchain: stable + profile: minimal + override: true + + - uses: swatinem/rust-cache@81d053bdb0871dcd3f10763c8cc60d0adc41762b # pin@v1 + with: + key: ${{ github.job }} + + - uses: astral-sh/setup-uv@5a7eac68fb9809dea845d802897dc5c723910fa3 # v7.1.3 + with: + version: '0.8.2' + # we just cache the venv-dir directly in action-setup-venv + enable-cache: false + + - uses: getsentry/action-setup-venv@5a80476d175edf56cb205b08bc58986fa99d1725 # v3.2.0 + with: + cache-dependency-path: uv.lock + install-cmd: uv sync --frozen --only-dev --active + + - name: Run multi-topic integration test + run: | + make test-multi-topic diff --git a/Makefile b/Makefile index ee63db0f..f912d4d3 100644 --- a/Makefile +++ b/Makefile @@ -69,6 +69,11 @@ test-upkeep-retry: build reset-kafka ## Run the upkeep retry integration test rm -r integration_tests/.tests_output/test_upkeep_retry .PHONY: test-upkeep-retry +test-multi-topic: build reset-kafka ## Run the multi-topic consumption integration test + python -m pytest integration_tests/integration_tests/test_multi_topic.py -s + rm -r integration_tests/.tests_output/test_multi_topic +.PHONY: test-multi-topic + test-upkeep-expiry: build reset-kafka ## Run the upkeep expiry integration test python -m pytest integration_tests/integration_tests/test_upkeep_expiry.py -s rm -r integration_tests/.tests_output/test_upkeep_expiry diff --git a/integration_tests/integration_tests/test_multi_topic.py b/integration_tests/integration_tests/test_multi_topic.py new file mode 100644 index 00000000..1706b68b --- /dev/null +++ b/integration_tests/integration_tests/test_multi_topic.py @@ -0,0 +1,133 @@ +import signal +import subprocess +import time + +import yaml + +from integration_tests.helpers import ( + TASKBROKER_BIN, + TESTS_OUTPUT_ROOT, + TaskbrokerConfig, + create_topic, + get_available_ports, + get_num_tasks_in_sqlite, + send_generic_messages_to_topic, +) + +TEST_OUTPUT_PATH = TESTS_OUTPUT_ROOT / "test_multi_topic" + + +def test_multi_topic_consumption() -> None: + """ + Verify that a single taskbroker configured (new `kafka_topics` format) with + two consumable topics consumes from BOTH into its one sqlite store. + + Each consumed topic gets its own rdkafka consumer (own group.id); both + pipelines write to the shared store. We produce N messages to each of two + topics and assert the broker ends up with 2*N tasks in sqlite. + """ + num_messages_per_topic = 1_000 + num_partitions = 4 + timeout = 60 + curr_time = int(time.time()) + + topic_a = f"multitopic-a-{curr_time}" + topic_b = f"multitopic-b-{curr_time}" + retry_topic = f"multitopic-retry-{curr_time}" + dlq_topic = f"multitopic-dlq-{curr_time}" + + # Pre-create the topics so the test exercises consumption, not topic + # creation. (create_missing_topics stays off.) + for topic in (topic_a, topic_b, retry_topic, dlq_topic): + create_topic(topic, num_partitions) + + TEST_OUTPUT_PATH.mkdir(parents=True, exist_ok=True) + db_name = f"db_multi_topic_{curr_time}" + db_path = str(TEST_OUTPUT_PATH / f"{db_name}.sqlite") + config_filename = f"config_multi_topic_{curr_time}.yml" + grpc_port = get_available_ports(1)[0] + + # New multi-topic config: two consumable topics + produce-only retry/dlq, + # all on a single cluster. + config_dict = { + "db_name": db_name, + "db_path": db_path, + "max_pending_count": 100_000, + "grpc_port": grpc_port, + "kafka_auto_offset_reset": "earliest", + "kafka_deadletter_topic": dlq_topic, + "kafka_retry_topic": retry_topic, + "kafka_clusters": { + "default": {"address": "127.0.0.1:9092"}, + }, + "kafka_topics": { + topic_a: {"cluster": "default", "consumer_group": f"{topic_a}-grp"}, + topic_b: {"cluster": "default", "consumer_group": f"{topic_b}-grp"}, + retry_topic: { + "cluster": "default", + "consumer_group": f"{retry_topic}-grp", + "produce_only": True, + }, + dlq_topic: { + "cluster": "default", + "consumer_group": f"{dlq_topic}-grp", + "produce_only": True, + }, + }, + } + + config_path = str(TEST_OUTPUT_PATH / config_filename) + with open(config_path, "w") as f: + yaml.safe_dump(config_dict, f) + + # A TaskbrokerConfig instance is only needed for the sqlite-counting helper, + # which reads db_name/db_path. + query_config = TaskbrokerConfig( + db_name=db_name, + db_path=db_path, + max_pending_count=100_000, + kafka_topic=topic_a, + kafka_deadletter_topic=dlq_topic, + kafka_consumer_group=f"{topic_a}-grp", + kafka_auto_offset_reset="earliest", + grpc_port=grpc_port, + ) + + log_path = str(TEST_OUTPUT_PATH / f"taskbroker_multi_topic_{curr_time}.log") + expected_total = num_messages_per_topic * 2 + + send_generic_messages_to_topic(topic_a, num_messages_per_topic) + send_generic_messages_to_topic(topic_b, num_messages_per_topic) + + process = None + try: + with open(log_path, "a") as log_file: + process = subprocess.Popen( + [str(TASKBROKER_BIN), "-c", config_path], + stderr=subprocess.STDOUT, + stdout=log_file, + ) + time.sleep(3) # give the broker time to start both consumers + + written = 0 + end = time.time() + timeout + while time.time() < end: + written = get_num_tasks_in_sqlite(query_config) + if written >= expected_total: + break + # the broker should still be alive while consuming + assert process.poll() is None, "taskbroker exited early" + time.sleep(1) + + assert written == expected_total, ( + f"expected {expected_total} tasks in sqlite " + f"({num_messages_per_topic} from each of two topics), got {written}" + ) + finally: + if process is not None: + process.send_signal(signal.SIGINT) + try: + assert process.wait(timeout=10) == 0 + except Exception: + process.kill() + raise diff --git a/src/config.rs b/src/config.rs index 5231b2d4..b1d46ca1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -89,6 +89,35 @@ impl ClusterConfig { || self.ssl_certificate_location.is_some() || self.ssl_key_location.is_some() } + + /// Apply this cluster's `bootstrap.servers` and any configured sasl/ssl + /// auth onto an rdkafka `ClientConfig`. Shared by the consumer, producer and + /// admin config builders so they all authenticate identically. + fn apply_to(&self, config: &mut ClientConfig) { + config.set("bootstrap.servers", self.address.clone()); + + if let Some(ref sasl_mechanism) = self.sasl_mechanism { + config.set("sasl.mechanism", sasl_mechanism); + } + if let Some(ref sasl_username) = self.sasl_username { + config.set("sasl.username", sasl_username); + } + if let Some(ref sasl_password) = self.sasl_password { + config.set("sasl.password", sasl_password); + } + if let Some(ref security_protocol) = self.security_protocol { + config.set("security.protocol", security_protocol); + } + if let Some(ref ssl_ca_location) = self.ssl_ca_location { + config.set("ssl.ca.location", ssl_ca_location); + } + if let Some(ref ssl_certificate_location) = self.ssl_certificate_location { + config.set("ssl.certificate.location", ssl_certificate_location); + } + if let Some(ref ssl_private_key_location) = self.ssl_key_location { + config.set("ssl.key.location", ssl_private_key_location); + } + } } #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Deserialize, Serialize)] @@ -618,8 +647,9 @@ impl Config { let uses_new_format = !self.kafka_topics.is_empty() || !self.kafka_clusters.is_empty(); // Any explicitly-set deprecated field describing a cluster (the main - // consumed cluster or the deadletter cluster). kafka_deadletter_topic is - // NOT deprecated and is intentionally excluded. + // consumed cluster or the deadletter cluster) or the deprecated global + // raw mode. kafka_deadletter_topic is NOT deprecated and is intentionally + // excluded. let uses_legacy = self.kafka_topic.is_some() || self.kafka_cluster.is_some() || self.kafka_consumer_group.is_some() @@ -637,7 +667,10 @@ impl Config { || self.kafka_deadletter_sasl_password.is_some() || self.kafka_deadletter_ssl_ca_location.is_some() || self.kafka_deadletter_ssl_certificate_location.is_some() - || self.kafka_deadletter_ssl_key_location.is_some(); + || self.kafka_deadletter_ssl_key_location.is_some() + // Global raw mode is a deprecated legacy field; in the new format raw + // mode is configured per topic via kafka_topics..raw. + || self.raw_mode; if uses_new_format && uses_legacy { return Err(anyhow!( @@ -677,6 +710,9 @@ impl Config { kafka_topics with a cluster reference instead" ); } + if self.raw_mode { + warn!("raw_mode is deprecated, use kafka_topics..raw instead"); + } let topic_name = self .kafka_topic @@ -819,8 +855,25 @@ impl Config { })?; } - // Validate exactly one consumable topic - self.consumable_topic()?; + // Validate at least one consumable topic. + let consumable = self.consumable_topics()?; + + // Multi-topic consumption is only supported on the sqlite adapter for + // now. The postgres adapter filters claims by a single shared partition + // list, but those partition numbers aren't unique across topics, so the + // filter would mix partitions from different topics together. Note this + // filtering exists only to avoid lock contention between brokers, not for + // correctness; supporting multi-topic on postgres means reworking how we + // avoid that contention (e.g. filtering by (topic, partition) or another + // mechanism entirely). Reject the combination here, before any consumer + // spawns. + if consumable.len() > 1 && self.database_adapter == DatabaseAdapter::Postgres { + return Err(anyhow!( + "multi-topic consumption ({} consumable topics) is not supported with the \ + postgres database adapter; use the sqlite adapter or a single consumable topic", + consumable.len() + )); + } // The deadletter topic must be a declared topic so the producer can // resolve its cluster. In legacy mode it was added above; in the new @@ -840,8 +893,39 @@ impl Config { // would be published to the wrong brokers. In the legacy format the // retry topic is registered above; in the new format the user must // declare it in kafka_topics. - let (consumed_topic, _) = self.consumable_topic()?; - let retry_target = self.kafka_retry_topic.as_deref().unwrap_or(consumed_topic); + // Normalize the retry topic so downstream code can always rely on it + // being set: + // - explicitly configured: used as-is. + // - single non-raw consumed topic: retries loop back to that topic. + // - raw mode: raw messages aren't activations, so retries must go to a + // separate activation-encoded topic; a retry topic is mandatory. + // - multiple consumed topics: no unambiguous fallback, so a single + // shared retry topic is mandatory. + if self.kafka_retry_topic.is_none() { + let has_raw = consumable.iter().any(|(_, cfg)| cfg.raw.is_some()); + let count = consumable.len(); + let single_topic = (count == 1).then(|| consumable[0].0.to_owned()); + + if has_raw { + return Err(anyhow!( + "kafka_retry_topic must be set explicitly when a consumed topic uses raw mode" + )); + } + match single_topic { + Some(topic) => self.kafka_retry_topic = Some(topic), + None => { + return Err(anyhow!( + "kafka_retry_topic is required when consuming from multiple topics ({} \ + consumable topics configured)", + count + )); + } + } + } + let retry_target = self + .kafka_retry_topic + .as_deref() + .expect("kafka_retry_topic is set above"); let retry_topic_config = self.kafka_topics.get(retry_target).ok_or_else(|| { Box::new(figment::Error::from(format!( "kafka_retry_topic '{retry_target}' is not defined in kafka_topics" @@ -862,36 +946,80 @@ impl Config { )); } + // Validate raw-mode topics up front. The raw fields are optional on + // `RawModeConfig` (they don't apply to non-raw topics), but a raw topic + // requires all of them. Catch missing fields here as a config error + // rather than panicking when the consumer builds its deserializer. + for (topic_name, topic_config) in &self.kafka_topics { + let Some(raw) = &topic_config.raw else { + continue; + }; + let application = raw.application.as_deref().ok_or_else(|| { + anyhow!("topic '{topic_name}' enables raw mode but is missing raw.application") + })?; + if !self.worker_map.contains_key(application) { + return Err(anyhow!( + "topic '{topic_name}' raw.application '{application}' is not in worker_map" + )); + } + if raw.namespace.is_none() { + return Err(anyhow!( + "topic '{topic_name}' enables raw mode but is missing raw.namespace" + )); + } + if raw.taskname.is_none() { + return Err(anyhow!( + "topic '{topic_name}' enables raw mode but is missing raw.taskname" + )); + } + if raw.processing_deadline_duration.is_none() { + return Err(anyhow!( + "topic '{topic_name}' enables raw mode but is missing \ + raw.processing_deadline_duration" + )); + } + // Raw messages aren't activations, so retries must go to a separate + // activation-encoded topic, never back to the raw topic itself. + if self.kafka_retry_topic.as_deref() == Some(topic_name.as_str()) { + return Err(anyhow!( + "kafka_retry_topic must differ from raw topic '{topic_name}'" + )); + } + } + Ok(()) } - /// Get the single consumable topic and its config. - /// Returns an error if there are zero or multiple consumable topics. - pub fn consumable_topic(&self) -> Result<(&str, &TopicConfig), Box> { - let mut consumable = self + /// Get all consumable (non-`produce_only`) topics and their configs, in + /// `kafka_topics` (BTreeMap) order. Returns an error only when there are no + /// consumable topics. This is the sole accessor for consumed topics; callers + /// that only handle a single topic should iterate and select explicitly + /// rather than assuming exactly one. + pub fn consumable_topics(&self) -> Result, Box> { + let consumable: Vec<(&str, &TopicConfig)> = self .kafka_topics .iter() - .filter(|(_, cfg)| !cfg.produce_only); + .filter(|(_, cfg)| !cfg.produce_only) + .map(|(name, cfg)| (name.as_str(), cfg)) + .collect(); - let first = consumable.next().ok_or_else(|| { - Box::new(figment::Error::from( + if consumable.is_empty() { + return Err(Box::new(figment::Error::from( "no consumable topic configured (all topics have produce_only: true)".to_owned(), - )) - })?; - - if consumable.next().is_some() { - let count = self - .kafka_topics - .values() - .filter(|t| !t.produce_only) - .count(); - return Err(Box::new(figment::Error::from(format!( - "multi-topic consumption is not yet supported: {} consumable topics configured, maximum is 1", - count - )))); + ))); } - Ok((first.0.as_str(), first.1)) + Ok(consumable) + } + + /// The topic retries are produced to. `normalize_and_validate` always sets + /// `kafka_retry_topic` (it defaults to the single consumed topic when only + /// one non-raw topic is configured), so this is infallible after validation. + /// Panics if config wasn't validated (call from_args, not extract directly). + pub fn retry_topic(&self) -> &str { + self.kafka_retry_topic + .as_deref() + .expect("kafka_retry_topic unset - was config validated?") } /// Get cluster config by name. @@ -902,13 +1030,15 @@ impl Config { .ok_or_else(|| Box::new(figment::Error::from(format!("unknown cluster: {}", name)))) } - /// Convert the application Config into rdkafka::ClientConfig for consumer. - /// Uses the single consumable topic's cluster. - /// Panics if config wasn't validated (call from_args, not extract directly). - pub fn kafka_consumer_config(&self) -> ClientConfig { - let (_, topic_config) = self - .consumable_topic() - .expect("consumable_topic failed - was config validated?"); + /// Convert the application Config into rdkafka::ClientConfig for the consumer + /// of a specific topic. Each consumed topic has its own consumer (own + /// `group.id` and cluster), so multi-topic spawns one consumer per topic. + /// Panics if `topic_name` isn't a declared topic (call from_args first). + pub fn kafka_consumer_config_for(&self, topic_name: &str) -> ClientConfig { + let topic_config = self + .kafka_topics + .get(topic_name) + .unwrap_or_else(|| panic!("unknown topic '{topic_name}' - was config validated?")); let cluster = self .cluster(&topic_config.cluster) .expect("cluster lookup failed - was config validated?"); @@ -925,9 +1055,9 @@ impl Config { .clone() .unwrap_or_else(|| self.kafka_auto_offset_reset.clone()); - let mut new_config = ClientConfig::new(); - let config = new_config - .set("bootstrap.servers", cluster.address.clone()) + let mut config = ClientConfig::new(); + cluster.apply_to(&mut config); + config .set("group.id", topic_config.consumer_group.clone()) .set("session.timeout.ms", session_timeout_ms.to_string()) .set("enable.partition.eof", "false") @@ -939,29 +1069,21 @@ impl Config { .set("auto.offset.reset", auto_offset_reset) .set("enable.auto.offset.store", "false"); - if let Some(ref sasl_mechanism) = cluster.sasl_mechanism { - config.set("sasl.mechanism", sasl_mechanism); - } - if let Some(ref sasl_username) = cluster.sasl_username { - config.set("sasl.username", sasl_username); - } - if let Some(ref sasl_password) = cluster.sasl_password { - config.set("sasl.password", sasl_password); - } - if let Some(ref security_protocol) = cluster.security_protocol { - config.set("security.protocol", security_protocol); - } - if let Some(ref ssl_ca_location) = cluster.ssl_ca_location { - config.set("ssl.ca.location", ssl_ca_location); - } - if let Some(ref ssl_certificate_location) = cluster.ssl_certificate_location { - config.set("ssl.certificate.location", ssl_certificate_location); - } - if let Some(ref ssl_private_key_location) = cluster.ssl_key_location { - config.set("ssl.key.location", ssl_private_key_location); - } + config + } + + /// Build an rdkafka::ClientConfig for an admin client on a named cluster. + /// Carries only `bootstrap.servers` + that cluster's auth (no consumer + /// settings), so topic creation targets the correct brokers. + /// Panics if the cluster isn't declared (call from_args first). + pub fn kafka_admin_config(&self, cluster_name: &str) -> ClientConfig { + let cluster = self + .cluster(cluster_name) + .expect("cluster lookup failed - was config validated?"); - config.clone() + let mut config = ClientConfig::new(); + cluster.apply_to(&mut config); + config } /// The cluster the deadletter / forwarding producer connects to: the @@ -985,33 +1107,10 @@ impl Config { pub fn kafka_producer_config(&self) -> ClientConfig { let cluster = self.kafka_producer_cluster(); - let mut new_config = ClientConfig::new(); - let config = new_config - .set("bootstrap.servers", cluster.address.clone()) - .set("message.max.bytes", format!("{}", self.max_message_size)); - if let Some(ref sasl_mechanism) = cluster.sasl_mechanism { - config.set("sasl.mechanism", sasl_mechanism); - } - if let Some(ref sasl_username) = cluster.sasl_username { - config.set("sasl.username", sasl_username); - } - if let Some(ref sasl_password) = cluster.sasl_password { - config.set("sasl.password", sasl_password); - } - if let Some(ref security_protocol) = cluster.security_protocol { - config.set("security.protocol", security_protocol); - } - if let Some(ref ssl_ca_location) = cluster.ssl_ca_location { - config.set("ssl.ca.location", ssl_ca_location); - } - if let Some(ref ssl_certificate_location) = cluster.ssl_certificate_location { - config.set("ssl.certificate.location", ssl_certificate_location); - } - if let Some(ref ssl_private_key_location) = cluster.ssl_key_location { - config.set("ssl.key.location", ssl_private_key_location); - } - - config.clone() + let mut config = ClientConfig::new(); + cluster.apply_to(&mut config); + config.set("message.max.bytes", format!("{}", self.max_message_size)); + config } } @@ -1202,7 +1301,7 @@ mod tests { // kafka_consumer_group is unset in the yaml, so the legacy field // stays None and normalization applies the "taskworker" default. assert_eq!(config.kafka_consumer_group, None); - let (topic_name, topic_config) = config.consumable_topic().unwrap(); + let (topic_name, topic_config) = config.consumable_topics().unwrap()[0]; assert_eq!(topic_name, "error-tasks"); assert_eq!(topic_config.consumer_group, "taskworker"); assert_eq!(config.kafka_auto_offset_reset, "earliest".to_owned()); @@ -1264,7 +1363,7 @@ mod tests { // Zero-config: legacy fields stay None, but normalization applies // the historical "taskworker" default as the consumable topic. assert_eq!(config.kafka_topic, None); - let (topic_name, topic_config) = config.consumable_topic().unwrap(); + let (topic_name, topic_config) = config.consumable_topics().unwrap()[0]; assert_eq!(topic_name, "taskworker"); assert_eq!( config.cluster(&topic_config.cluster).unwrap().address, @@ -1317,7 +1416,7 @@ mod tests { fn test_kafka_consumer_config() { let args = Args { config: None }; let config = Config::from_args(&args).unwrap(); - let consumer_config = config.kafka_consumer_config(); + let consumer_config = config.kafka_consumer_config_for("taskworker"); assert_eq!( consumer_config.get("bootstrap.servers").unwrap(), @@ -1337,7 +1436,7 @@ mod tests { let args = Args { config: None }; let config = Config::from_args(&args).unwrap(); - let consumer_config = config.kafka_consumer_config(); + let consumer_config = config.kafka_consumer_config_for("taskworker"); assert_eq!( consumer_config.get("security.protocol").unwrap(), @@ -1372,7 +1471,7 @@ mod tests { let args = Args { config: None }; let config = Config::from_args(&args).unwrap(); - let consumer_config = config.kafka_consumer_config(); + let consumer_config = config.kafka_consumer_config_for("taskworker"); assert_eq!( consumer_config.get("ssl.ca.location").unwrap(), @@ -1518,13 +1617,20 @@ mod tests { "config.yaml", r#" kafka_deadletter_topic: profiles-dlq +kafka_retry_topic: profiles-retry + +worker_map: + profiles: http://worker-profiles:50052 kafka_topics: profiles: cluster: profiles-cluster consumer_group: taskbroker-profiles raw: + namespace: profiles application: profiles + taskname: profiles.process + processing_deadline_duration: 30 profiles-retry: cluster: profiles-cluster consumer_group: taskbroker-profiles-retry @@ -1555,10 +1661,10 @@ kafka_clusters: assert_eq!( profiles.raw, Some(RawModeConfig { - namespace: None, + namespace: Some("profiles".to_owned()), application: Some("profiles".to_owned()), - taskname: None, - processing_deadline_duration: None, + taskname: Some("profiles.process".to_owned()), + processing_deadline_duration: Some(30), }) ); @@ -1585,7 +1691,7 @@ kafka_clusters: assert!(!clusters.contains_key("default")); // Test consumable_topic() and cluster() helpers - let (topic_name, topic_config) = config.consumable_topic().unwrap(); + let (topic_name, topic_config) = config.consumable_topics().unwrap()[0]; assert_eq!(topic_name, "profiles"); assert_eq!(topic_config.cluster, "profiles-cluster"); @@ -1596,6 +1702,59 @@ kafka_clusters: }); } + /// A raw topic missing a required field must be rejected at config time with + /// a clear error, not panic later when the consumer builds its deserializer. + #[test] + fn test_raw_mode_missing_field_rejected_cleanly() { + Jail::expect_with(|jail| { + // raw is missing `namespace` (application/taskname/deadline present). + jail.create_file( + "config.yaml", + r#" +kafka_deadletter_topic: profiles-dlq +kafka_retry_topic: profiles-retry + +worker_map: + profiles: http://worker-profiles:50052 + +kafka_topics: + profiles: + cluster: profiles-cluster + consumer_group: taskbroker-profiles + raw: + application: profiles + taskname: profiles.process + processing_deadline_duration: 30 + profiles-retry: + cluster: profiles-cluster + consumer_group: taskbroker-profiles-retry + produce_only: true + profiles-dlq: + cluster: profiles-cluster + consumer_group: taskbroker-profiles-dlq + produce_only: true + +kafka_clusters: + profiles-cluster: + address: 10.0.0.1:9092 +"#, + )?; + + let args = Args { + config: Some("config.yaml".to_owned()), + }; + // A clean error, not a panic. + let err = Config::from_args(&args).unwrap_err(); + assert!( + err.to_string().contains("raw.namespace"), + "expected a clean missing-field error, got: {}", + err + ); + + Ok(()) + }); + } + #[test] fn test_multi_topic_config_from_env() { Jail::expect_with(|jail| { @@ -1644,7 +1803,7 @@ kafka_clusters: assert!(!clusters.contains_key("default")); // Test consumable_topic() helper - let (topic_name, _) = config.consumable_topic().unwrap(); + let (topic_name, _) = config.consumable_topics().unwrap()[0]; assert_eq!(topic_name, "profiles"); Ok(()) @@ -1745,12 +1904,18 @@ kafka_clusters: } #[test] - fn test_multi_topic_rejects_multiple_consumable_topics() { + fn test_multi_topic_rejected_on_postgres() { Jail::expect_with(|jail| { - // Two consumable topics - the guard for this PR rejects this. + // Multiple consumable topics are allowed on sqlite but rejected on + // postgres, whose claim filtering can't distinguish partitions + // across topics. jail.create_file( "config.yaml", r#" +database_adapter: postgres +kafka_deadletter_topic: tasks-dlq +kafka_retry_topic: tasks-retry + kafka_topics: profiles: cluster: my-cluster @@ -1758,6 +1923,14 @@ kafka_topics: subscriptions: cluster: my-cluster consumer_group: taskbroker-subscriptions + tasks-retry: + cluster: my-cluster + consumer_group: taskbroker-retry + produce_only: true + tasks-dlq: + cluster: my-cluster + consumer_group: taskbroker-dlq + produce_only: true kafka_clusters: my-cluster: @@ -1771,7 +1944,7 @@ kafka_clusters: let err = Config::from_args(&args).unwrap_err(); assert!( err.to_string() - .contains("multi-topic consumption is not yet supported"), + .contains("not supported with the postgres database adapter"), "unexpected error: {}", err ); @@ -1822,7 +1995,7 @@ kafka_clusters: assert!(topics.get("profiles-dlq").unwrap().produce_only); // consumable_topic() returns the one consumable topic - let (topic_name, _) = config.consumable_topic().unwrap(); + let (topic_name, _) = config.consumable_topics().unwrap()[0]; assert_eq!(topic_name, "profiles"); Ok(()) @@ -1976,7 +2149,7 @@ kafka_clusters: config: Some("config.yaml".to_owned()), }; let config = Config::from_args(&args).unwrap(); - let consumer_config = config.kafka_consumer_config(); + let consumer_config = config.kafka_consumer_config_for("profiles"); // Per-topic values win over the globals. assert_eq!(consumer_config.get("session.timeout.ms").unwrap(), "12000"); @@ -2023,7 +2196,7 @@ kafka_clusters: config: Some("config.yaml".to_owned()), }; let config = Config::from_args(&args).unwrap(); - let consumer_config = config.kafka_consumer_config(); + let consumer_config = config.kafka_consumer_config_for("profiles"); // No per-topic overrides, so the globals are used. assert_eq!(consumer_config.get("session.timeout.ms").unwrap(), "7000"); diff --git a/src/kafka/activation_batcher.rs b/src/kafka/activation_batcher.rs index a5a1ba73..9e608c5f 100644 --- a/src/kafka/activation_batcher.rs +++ b/src/kafka/activation_batcher.rs @@ -19,7 +19,6 @@ use super::consumer::{ pub struct ActivationBatcherConfig { pub producer_config: ClientConfig, - pub kafka_cluster: String, pub kafka_topic: String, pub kafka_long_topic: String, pub send_timeout_ms: u64, @@ -29,18 +28,12 @@ pub struct ActivationBatcherConfig { } impl ActivationBatcherConfig { - /// Convert from application configuration into ActivationBatcher config. - pub fn from_config(config: &Config) -> Self { - let (topic_name, topic_config) = config - .consumable_topic() - .expect("no consumable topic configured"); - let cluster = config - .cluster(&topic_config.cluster) - .expect("cluster not found"); - + /// Convert from application configuration into ActivationBatcher config for a + /// single consumed topic. Each consumer has its own batcher, so the topic is + /// passed explicitly rather than derived from "the" consumable topic. + pub fn from_topic(config: &Config, topic_name: &str) -> Self { Self { producer_config: config.kafka_producer_config(), - kafka_cluster: cluster.address.clone(), kafka_topic: topic_name.to_owned(), kafka_long_topic: config.kafka_long_topic.clone(), send_timeout_ms: config.kafka_send_timeout_ms, @@ -104,15 +97,23 @@ impl Reducer for ActivationBatcher { let namespace = &t.namespace; if runtime_config.drop_task_killswitch.contains(task_name) { - metrics::counter!("filter.drop_task_killswitch", "taskname" => task_name.clone()) - .increment(1); + metrics::counter!( + "filter.drop_task_killswitch", + "topic" => self.config.kafka_topic.clone(), + "taskname" => task_name.clone(), + ) + .increment(1); return Ok(()); } if let Some(expires_at) = t.expires_at && Utc::now() > expires_at { - metrics::counter!("filter.expired_at_consumer").increment(1); + metrics::counter!( + "filter.expired_at_consumer", + "topic" => self.config.kafka_topic.clone(), + ) + .increment(1); return Ok(()); } @@ -120,6 +121,7 @@ impl Reducer for ActivationBatcher { if forward_topic == self.config.kafka_topic { metrics::counter!( "filter.forward_task_demoted_namespace.skipped", + "topic" => self.config.kafka_topic.clone(), "namespace" => namespace.clone(), "taskname" => task_name.clone(), ) @@ -127,6 +129,7 @@ impl Reducer for ActivationBatcher { } else { metrics::counter!( "filter.forward_task_demoted_namespace", + "topic" => self.config.kafka_topic.clone(), "namespace" => namespace.clone(), "taskname" => task_name.clone(), ) @@ -147,16 +150,28 @@ impl Reducer for ActivationBatcher { return Ok(None); } - metrics::histogram!("consumer.batch_rows").record(self.batch.len() as f64); - metrics::histogram!("consumer.batch_bytes").record(self.batch_size as f64); + metrics::histogram!("consumer.batch_rows", "topic" => self.config.kafka_topic.clone()) + .record(self.batch.len() as f64); + metrics::histogram!("consumer.batch_bytes", "topic" => self.config.kafka_topic.clone()) + .record(self.batch_size as f64); // Send all forward batch in parallel if !self.forward_batch.is_empty() { let runtime_config = self.runtime_config_manager.read().await; - let forward_cluster = runtime_config - .demoted_topic_cluster - .clone() - .unwrap_or(self.config.kafka_cluster.clone()); + // The forwarding producer authenticates against the deadletter + // cluster, so default demoted forwarding there too (and consistently + // with upkeep) when no demoted_topic_cluster is configured. + let forward_cluster = + runtime_config + .demoted_topic_cluster + .clone() + .unwrap_or_else(|| { + self.config + .producer_config + .get("bootstrap.servers") + .expect("producer config always sets bootstrap.servers") + .to_string() + }); if self.producer_cluster != forward_cluster { let mut new_config = self.config.producer_config.clone(); new_config.set("bootstrap.servers", &forward_cluster); @@ -181,9 +196,12 @@ impl Reducer for ActivationBatcher { let results = join_all(sends).await; let success_count = results.iter().filter(|r| r.is_ok()).count(); - metrics::histogram!("consumer.forward_attempts").record(results.len() as f64); - metrics::histogram!("consumer.forward_successes").record(success_count as f64); - metrics::histogram!("consumer.forward_failures") + let topic = self.config.kafka_topic.clone(); + metrics::histogram!("consumer.forward_attempts", "topic" => topic.clone()) + .record(results.len() as f64); + metrics::histogram!("consumer.forward_successes", "topic" => topic.clone()) + .record(success_count as f64); + metrics::histogram!("consumer.forward_failures", "topic" => topic) .record((results.len() - success_count) as f64); self.forward_batch.clear(); @@ -252,7 +270,7 @@ demoted_namespaces: config.normalize_and_validate().unwrap(); let config = Arc::new(config); let mut batcher = ActivationBatcher::new( - ActivationBatcherConfig::from_config(&config), + ActivationBatcherConfig::from_topic(&config, config.consumable_topics().unwrap()[0].0), runtime_config, ); @@ -275,7 +293,7 @@ demoted_namespaces: config.normalize_and_validate().unwrap(); let config = Arc::new(config); let mut batcher = ActivationBatcher::new( - ActivationBatcherConfig::from_config(&config), + ActivationBatcherConfig::from_topic(&config, config.consumable_topics().unwrap()[0].0), runtime_config, ); @@ -304,7 +322,7 @@ demoted_namespaces: let config = Arc::new(config); let mut batcher = ActivationBatcher::new( - ActivationBatcherConfig::from_config(&config), + ActivationBatcherConfig::from_topic(&config, config.consumable_topics().unwrap()[0].0), runtime_config, ); @@ -334,7 +352,7 @@ demoted_namespaces: let config = Arc::new(config); let mut batcher = ActivationBatcher::new( - ActivationBatcherConfig::from_config(&config), + ActivationBatcherConfig::from_topic(&config, config.consumable_topics().unwrap()[0].0), runtime_config, ); @@ -380,11 +398,11 @@ demoted_topic: taskworker-demoted"#; config.normalize_and_validate().unwrap(); let config = Arc::new(config); let mut batcher = ActivationBatcher::new( - ActivationBatcherConfig::from_config(&config), + ActivationBatcherConfig::from_topic(&config, config.consumable_topics().unwrap()[0].0), runtime_config, ); - let (_, topic_config) = config.consumable_topic().unwrap(); + let (_, topic_config) = config.consumable_topics().unwrap()[0]; let cluster_address = config .cluster(&topic_config.cluster) .unwrap() diff --git a/src/kafka/activation_writer.rs b/src/kafka/activation_writer.rs index 4f8c4589..2f024910 100644 --- a/src/kafka/activation_writer.rs +++ b/src/kafka/activation_writer.rs @@ -16,6 +16,9 @@ use super::consumer::{ }; pub struct ActivationWriterConfig { + /// The consumed topic this writer belongs to, used as a metric tag. Each + /// consumer has its own writer, so writer metrics are per-topic. + pub topic: String, pub max_buf_len: usize, pub max_pending_activations: usize, pub max_processing_activations: usize, @@ -25,9 +28,11 @@ pub struct ActivationWriterConfig { } impl ActivationWriterConfig { - /// Convert from application configuration into ActivationWriter config. - pub fn from_config(config: &Config) -> Self { + /// Convert from application configuration into ActivationWriter config for a + /// single consumed topic. + pub fn from_topic(config: &Config, topic: &str) -> Self { Self { + topic: topic.to_owned(), db_max_size: config.db_max_size, max_buf_len: config.db_insert_batch_max_len, max_pending_activations: config.max_pending_count, @@ -133,6 +138,7 @@ impl Reducer for ActivationWriter { }; metrics::counter!( "consumer.inflight_activation_writer.backpressure", + "topic" => self.config.topic.clone(), "reason" => reason, ) .increment(1); @@ -160,11 +166,21 @@ impl Reducer for ActivationWriter { .min_by_key(|item| item.timestamp()) .unwrap(); - metrics::histogram!("consumer.inflight_activation_writer.write_to_store") - .record(write_to_store_start.elapsed()); - metrics::histogram!("consumer.inflight_activation_writer.insert_lag") - .record(lag.num_seconds() as f64); - metrics::counter!("consumer.inflight_activation_writer.stored").increment(entries); + metrics::histogram!( + "consumer.inflight_activation_writer.write_to_store", + "topic" => self.config.topic.clone(), + ) + .record(write_to_store_start.elapsed()); + metrics::histogram!( + "consumer.inflight_activation_writer.insert_lag", + "topic" => self.config.topic.clone(), + ) + .record(lag.num_seconds() as f64); + metrics::counter!( + "consumer.inflight_activation_writer.stored", + "topic" => self.config.topic.clone(), + ) + .increment(entries); debug!( "Inserted {:?} entries with max lag: {:?}s", entries, @@ -174,7 +190,11 @@ impl Reducer for ActivationWriter { } Err(err) => { error!("Unable to write to sqlite: {}", err); - metrics::counter!("consumer.inflight_activation_writer.write_failed").increment(1); + metrics::counter!( + "consumer.inflight_activation_writer.write_failed", + "topic" => self.config.topic.clone(), + ) + .increment(1); sleep(Duration::from_millis(self.config.write_failure_backoff_ms)).await; Ok(None) } @@ -216,6 +236,7 @@ mod tests { async fn test_writer_flush_batch(#[case] adapter: &str) { let store = create_test_store(adapter).await; let writer_config = ActivationWriterConfig { + topic: "test-topic".to_string(), db_max_size: None, max_buf_len: 100, max_pending_activations: 10, @@ -262,6 +283,7 @@ mod tests { async fn test_writer_flush_only_pending(#[case] adapter: &str) { let store = create_test_store(adapter).await; let writer_config = ActivationWriterConfig { + topic: "test-topic".to_string(), db_max_size: None, max_buf_len: 100, max_pending_activations: 10, @@ -297,6 +319,7 @@ mod tests { async fn test_writer_flush_only_delay(#[case] adapter: &str) { let store = create_test_store(adapter).await; let writer_config = ActivationWriterConfig { + topic: "test-topic".to_string(), db_max_size: None, max_buf_len: 100, max_pending_activations: 0, @@ -337,6 +360,7 @@ mod tests { async fn test_writer_backpressure_pending_limit_reached(#[case] adapter: &str) { let store = create_test_store(adapter).await; let writer_config = ActivationWriterConfig { + topic: "test-topic".to_string(), db_max_size: None, max_buf_len: 100, max_pending_activations: 0, @@ -386,6 +410,7 @@ mod tests { ) { let store = create_test_store(adapter).await; let writer_config = ActivationWriterConfig { + topic: "test-topic".to_string(), db_max_size: None, max_buf_len: 100, max_pending_activations: 10, @@ -433,6 +458,7 @@ mod tests { async fn test_writer_backpressure_processing_limit_reached(#[case] adapter: &str) { let store = create_test_store(adapter).await; let writer_config = ActivationWriterConfig { + topic: "test-topic".to_string(), db_max_size: None, max_buf_len: 100, max_pending_activations: 10, @@ -502,6 +528,7 @@ mod tests { async fn test_writer_backpressure_db_size_limit_reached(#[case] adapter: &str) { let store = create_test_store(adapter).await; let writer_config = ActivationWriterConfig { + topic: "test-topic".to_string(), // 200 rows is ~50KB db_max_size: Some(50_000), max_buf_len: 100, @@ -534,6 +561,7 @@ mod tests { async fn test_writer_flush_empty_batch(#[case] adapter: &str) { let store = create_test_store(adapter).await; let writer_config = ActivationWriterConfig { + topic: "test-topic".to_string(), db_max_size: None, max_buf_len: 100, max_pending_activations: 10, diff --git a/src/kafka/admin.rs b/src/kafka/admin.rs index 5cbfcb51..61cfee57 100644 --- a/src/kafka/admin.rs +++ b/src/kafka/admin.rs @@ -1,31 +1,53 @@ -use anyhow::Error; +use anyhow::{Error, anyhow}; use rdkafka::ClientConfig; use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; +use rdkafka::types::RDKafkaErrorCode; use tracing::info; +/// Create the given topics on the cluster described by `admin_config` if they +/// don't already exist. `topics` pairs each topic name with its partition count. +/// +/// `admin_config` should carry only `bootstrap.servers` + auth (see +/// [`Config::kafka_admin_config`](crate::config::Config::kafka_admin_config)) so +/// the topics are created on the right cluster. A pre-existing topic +/// (`TopicAlreadyExists`) is treated as success; any other per-topic failure is +/// surfaced so a misconfigured or unreachable cluster fails loudly at startup. pub async fn create_missing_topics( - kafka_client_config: ClientConfig, - topic: &str, - default_topic_partitions: i32, + admin_config: ClientConfig, + topics: &[(&str, i32)], ) -> Result<(), Error> { - let admin_client: AdminClient<_> = kafka_client_config + if topics.is_empty() { + return Ok(()); + } + + let admin_client: AdminClient<_> = admin_config .create() - .expect("Unable to reate rdkafka admin client"); + .map_err(|e| anyhow!("Unable to create rdkafka admin client: {e}"))?; + + info!("Creating topics {:?} if they do not already exist", topics); + let new_topics: Vec = topics + .iter() + .map(|(name, partitions)| NewTopic::new(name, *partitions, TopicReplication::Fixed(1))) + .collect(); - info!( - "Creating topic {:?} with {} partitions if it does not already exists", - topic, default_topic_partitions - ); - admin_client - .create_topics( - &vec![NewTopic::new( - topic, - default_topic_partitions, - TopicReplication::Fixed(1), - )], - &AdminOptions::new(), - ) + let results = admin_client + .create_topics(&new_topics, &AdminOptions::new()) .await?; + // `create_topics` returns one result per topic; a request-level error is + // already propagated above by `?`. Tolerate topics that already exist, but + // surface every other per-topic failure (auth denied, invalid config, ...). + for result in results { + match result { + Ok(_) => {} + Err((topic, RDKafkaErrorCode::TopicAlreadyExists)) => { + info!("Topic {:?} already exists, skipping", topic); + } + Err((topic, code)) => { + return Err(anyhow!("Failed to create topic {:?}: {}", topic, code)); + } + } + } + Ok(()) } diff --git a/src/kafka/consumer.rs b/src/kafka/consumer.rs index aee767b1..3d445a37 100644 --- a/src/kafka/consumer.rs +++ b/src/kafka/consumer.rs @@ -41,7 +41,10 @@ pub async fn start_consumer( ) -> Result<(), Error> { let (client_shutdown_sender, client_shutdown_receiver) = oneshot::channel(); let (event_sender, event_receiver) = unbounded_channel(); - let context = KafkaContext::new(event_sender.clone()); + // Each consumer subscribes to a single topic; join defensively in case that + // ever changes. Used as the `topic` tag on this consumer's metrics. + let topic = topics.join(","); + let context = KafkaContext::new(event_sender.clone(), topic.clone()); let consumer: Arc> = Arc::new( kafka_client_config .create_with_context(context) @@ -54,13 +57,14 @@ pub async fn start_consumer( handle_shutdown_signals(event_sender.clone()); poll_consumer_client(consumer.clone(), client_shutdown_receiver); - metrics::gauge!("arroyo.consumer.current_partitions").set(0); + metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topic.clone()).set(0); handle_events( consumer, event_receiver, activation_store, client_shutdown_sender, spawn_actors, + topic, ) .await } @@ -109,11 +113,16 @@ pub fn poll_consumer_client( #[derive(Debug)] pub struct KafkaContext { event_sender: UnboundedSender<(Event, SyncSender<()>)>, + /// The topic(s) this consumer is subscribed to, used as a metric tag. + topic: String, } impl KafkaContext { - pub fn new(event_sender: UnboundedSender<(Event, SyncSender<()>)>) -> Self { - Self { event_sender } + pub fn new(event_sender: UnboundedSender<(Event, SyncSender<()>)>, topic: String) -> Self { + Self { + event_sender, + topic, + } } } @@ -140,8 +149,11 @@ impl ConsumerContext for KafkaContext { info!("Partition assignment event sent, waiting for rendezvous..."); let _ = rendezvous_receiver.recv(); info!("Rendezvous complete"); - metrics::counter!("arroyo.consumer.partitions_assigned.count") - .increment(tpl.count() as u64); + metrics::counter!( + "arroyo.consumer.partitions_assigned.count", + "topic" => self.topic.clone(), + ) + .increment(tpl.count() as u64); } Rebalance::Revoke(tpl) => { debug!("Got pre-rebalance callback, kind: Revoke"); @@ -159,8 +171,11 @@ impl ConsumerContext for KafkaContext { info!("Partition revocation event sent, waiting for rendezvous..."); let _ = rendezvous_receiver.recv(); info!("Rendezvous complete"); - metrics::counter!("arroyo.consumer.partitions_revoked.count") - .increment(tpl.count() as u64); + metrics::counter!( + "arroyo.consumer.partitions_revoked.count", + "topic" => self.topic.clone(), + ) + .increment(tpl.count() as u64); } Rebalance::Error(err) => { debug!("Got pre-rebalance callback, kind: Error"); @@ -337,6 +352,7 @@ pub async fn handle_events( Arc>, &BTreeSet<(String, i32)>, ) -> ActorHandles, + topic: String, ) -> Result<(), anyhow::Error> { const CALLBACK_DURATION: Duration = Duration::from_secs(4); @@ -363,7 +379,8 @@ pub async fn handle_events( info!("Received event: {:?}", event); state = match (state, event) { (ConsumerState::Ready, Event::Assign(tpl)) => { - metrics::gauge!("arroyo.consumer.current_partitions").set(tpl.len() as f64); + metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topic.clone()) + .set(tpl.len() as f64); // Note: This assumes we only process one topic per consumer. let mut partitions = Vec::::new(); for (_, partition) in tpl.iter() { @@ -386,7 +403,7 @@ pub async fn handle_events( ); activation_store.assign_partitions(vec![]).unwrap(); handles.shutdown(CALLBACK_DURATION).await; - metrics::gauge!("arroyo.consumer.current_partitions").set(0); + metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topic.clone()).set(0); ConsumerState::Ready } (ConsumerState::Consuming(handles, _), Event::Shutdown) => { @@ -394,7 +411,7 @@ pub async fn handle_events( handles.shutdown(CALLBACK_DURATION).await; debug!("Signaling shutdown to client..."); shutdown_client.take(); - metrics::gauge!("arroyo.consumer.current_partitions").set(0); + metrics::gauge!("arroyo.consumer.current_partitions", "topic" => topic.clone()).set(0); ConsumerState::Stopped } (ConsumerState::Stopped, _) => { diff --git a/src/kafka/deserialize.rs b/src/kafka/deserialize.rs index 0ab18d0e..61f484b1 100644 --- a/src/kafka/deserialize.rs +++ b/src/kafka/deserialize.rs @@ -18,10 +18,19 @@ pub struct DeserializeConfig { } impl DeserializeConfig { - pub fn from_config(config: &Config) -> Self { + /// Build the deserializer config for a single consumed topic. Raw mode is + /// taken from that topic's `kafka_topics..raw`, so each consumer + /// (legacy single-topic included) deserializes according to its own topic. + pub fn from_topic(config: &Config, topic_name: &str) -> Self { + let raw_config = config + .kafka_topics + .get(topic_name) + .and_then(|topic| topic.raw.as_ref()) + .map(|raw| RawConfig::from_topic(config, topic_name, raw)); + Self { activation_config: DeserializeActivationConfig::from_config(config), - raw_config: RawConfig::from_config(config), + raw_config, retry_topic: config.kafka_retry_topic.clone(), } } diff --git a/src/kafka/deserialize_activation.rs b/src/kafka/deserialize_activation.rs index 332c727e..9ab1d11e 100644 --- a/src/kafka/deserialize_activation.rs +++ b/src/kafka/deserialize_activation.rs @@ -48,6 +48,7 @@ pub fn new( metrics::histogram!( "consumer.message.payload_size_bytes", + "topic" => msg.topic().to_string(), "namespace" => namespace.clone(), "taskname" => taskname.clone() ) diff --git a/src/kafka/deserialize_raw.rs b/src/kafka/deserialize_raw.rs index 89577850..40c53a02 100644 --- a/src/kafka/deserialize_raw.rs +++ b/src/kafka/deserialize_raw.rs @@ -10,7 +10,7 @@ use rdkafka::message::OwnedMessage; use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, TaskActivation}; use uuid::Uuid; -use crate::config::Config; +use crate::config::{Config, RawModeConfig}; use crate::store::activation::{Activation, ActivationStatus}; use super::deserialize_activation::bucket_from_id; @@ -29,43 +29,45 @@ pub struct RawConfig { } impl RawConfig { - pub fn from_config(config: &Config) -> Option { - if !config.raw_mode { - return None; - } - let application = config - .raw_application + /// Build the raw-mode deserializer config for a single topic from that + /// topic's [`RawModeConfig`]. The legacy global `raw_*` fields are migrated + /// into the topic's `raw` during config normalization, so this is the single + /// runtime source of truth for both legacy and multi-topic formats. + pub fn from_topic(config: &Config, topic_name: &str, raw: &RawModeConfig) -> Self { + let application = raw + .application .clone() - .expect("raw_application required when raw_mode is enabled"); + .expect("raw application required when a topic enables raw mode"); assert!( config.worker_map.contains_key(&application), - "raw_application '{}' must exist in worker_map", + "raw application '{}' must exist in worker_map", application ); + // A raw topic's messages aren't activations, so its retries must go to a + // separate (activation-encoded) retry topic, never back to itself. if let Some(ref retry_topic) = config.kafka_retry_topic { - let (main_topic, _) = config - .consumable_topic() - .expect("no consumable topic configured"); assert!( - retry_topic != main_topic, - "kafka_retry_topic cannot equal kafka_topic when raw_mode is enabled" + retry_topic != topic_name, + "kafka_retry_topic cannot equal raw topic '{topic_name}'" ); } - Some(Self { - namespace: config - .raw_namespace + Self { + namespace: raw + .namespace .clone() - .expect("raw_namespace required when raw_mode is enabled"), + .expect("raw namespace required when a topic enables raw mode"), application, - taskname: config - .raw_taskname + taskname: raw + .taskname .clone() - .expect("raw_taskname required when raw_mode is enabled"), - processing_deadline_duration: config.raw_processing_deadline_duration, - }) + .expect("raw taskname required when a topic enables raw mode"), + processing_deadline_duration: raw + .processing_deadline_duration + .expect("raw processing_deadline_duration required when a topic enables raw mode"), + } } } @@ -141,6 +143,7 @@ pub fn new(config: RawConfig) -> impl Fn(Arc) -> Result msg.topic().to_string(), "namespace" => config.namespace.clone(), "taskname" => config.taskname.clone() ) diff --git a/src/main.rs b/src/main.rs index dd3f9939..93f971d1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::Duration; @@ -75,25 +75,18 @@ async fn main() -> Result<(), Error> { // If this is an environment where the topics might not exist, check and create them. if config.create_missing_topics { - let kafka_client_config = config.kafka_consumer_config(); - let (main_topic, _) = config - .consumable_topic() - .map_err(|e| anyhow!("invalid config: {}", e))?; - create_missing_topics( - kafka_client_config.clone(), - main_topic, - config.default_topic_partitions, - ) - .await?; - - // Create retry topic if configured - if let Some(ref retry_topic) = config.kafka_retry_topic { - create_missing_topics( - kafka_client_config, - retry_topic, - config.default_topic_partitions, - ) - .await?; + // Group every declared topic by its cluster so each is created on the + // right brokers (main, retry, deadletter and produce-only topics, which + // may live on different clusters). + let mut topics_by_cluster: BTreeMap<&str, Vec<(&str, i32)>> = BTreeMap::new(); + for (topic_name, topic_config) in &config.kafka_topics { + topics_by_cluster + .entry(topic_config.cluster.as_str()) + .or_default() + .push((topic_name.as_str(), config.default_topic_partitions)); + } + for (cluster, topics) in topics_by_cluster { + create_missing_topics(config.kafka_admin_config(cluster), &topics).await?; } } @@ -157,25 +150,30 @@ async fn main() -> Result<(), Error> { } }); - // Consumer from kafka - let consumer_task = taskbroker::tokio::spawn({ + // Consumer(s) from kafka. Each consumed topic gets its own consumer (own + // group.id and cluster), so we spawn one consumer task per consumable topic, + // all sharing the one activation store. + let consumer_topics: Vec = config + .consumable_topics() + .expect("invalid config: no consumable topic") + .into_iter() + .map(|(name, _)| name.to_owned()) + .collect(); + + let mut consumer_tasks: Vec<(String, JoinHandle>)> = Vec::new(); + for topic in consumer_topics { let consumer_store = store.clone(); let consumer_config = config.clone(); let runtime_config_manager = runtime_config_manager.clone(); + let task_topic = topic.clone(); - // Build list of topics to consume from - let (main_topic, _) = consumer_config - .consumable_topic() - .expect("invalid config: no consumable topic"); - let topics_to_consume = [main_topic.to_owned()]; - - async move { + let handle = taskbroker::tokio::spawn(async move { // The consumer has an internal thread that listens for cancellations, so it doesn't need // an outer select here like the other tasks. - let topic_refs: Vec<&str> = topics_to_consume.iter().map(|s| s.as_str()).collect(); + let topic_refs = [task_topic.as_str()]; start_consumer( &topic_refs, - &consumer_config.kafka_consumer_config(), + &consumer_config.kafka_consumer_config_for(&task_topic), consumer_store.clone(), processing_strategy!({ err: @@ -185,23 +183,24 @@ async fn main() -> Result<(), Error> { ), map: - deserialize::new(DeserializeConfig::from_config(&consumer_config)), + deserialize::new(DeserializeConfig::from_topic(&consumer_config, &task_topic)), reduce: ActivationBatcher::new( - ActivationBatcherConfig::from_config(&consumer_config), + ActivationBatcherConfig::from_topic(&consumer_config, &task_topic), runtime_config_manager.clone() ), ActivationWriter::new( consumer_store.clone(), - ActivationWriterConfig::from_config(&consumer_config) + ActivationWriterConfig::from_topic(&consumer_config, &task_topic) ), }), ) .await - } - }); + }); + consumer_tasks.push((topic, handle)); + } // Status update flush task let (status_update_tx, status_update_task) = if config.batch_status_updates { @@ -337,10 +336,14 @@ async fn main() -> Result<(), Error> { .on_sigint() .on_signal(SignalKind::hangup()) .on_signal(SignalKind::quit()) - .on_completion(log_task_completion("consumer", consumer_task)) .on_completion(log_task_completion("upkeep_task", upkeep_task)) .on_completion(log_task_completion("maintenance_task", maintenance_task)); + for (topic, handle) in consumer_tasks { + departure = + departure.on_completion(log_task_completion(format!("consumer:{topic}"), handle)); + } + if let Some(task) = grpc_server_task { departure = departure.on_completion(log_task_completion("grpc_server", task)); } diff --git a/src/runtime_config.rs b/src/runtime_config.rs index 344a0e07..83dae1e7 100644 --- a/src/runtime_config.rs +++ b/src/runtime_config.rs @@ -16,7 +16,8 @@ pub struct RuntimeConfig { /// to prevent them from blocking other tasks in shared namespaces. pub demoted_namespaces: Vec, /// The cluster to forward tasks from demoted namespaces to. - /// If not set, the current cluster taskbroker is consuming from will be used. + /// If not set, the deadletter cluster is used (the forwarding producer + /// authenticates against it, so it is the only reliably reachable default). pub demoted_topic_cluster: Option, /// The topic to forward tasks from demoted namespaces to. /// If not set, the taskworker-long topic will be used diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index 613abd8d..4bc53971 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -22,7 +22,7 @@ use libsqlite3_sys::{ }; use prost::Message; use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, TaskActivation}; -use tracing::{instrument, warn}; +use tracing::{debug, instrument, warn}; use crate::config::Config; use crate::store::activation::{Activation, ActivationStatus}; @@ -436,7 +436,9 @@ impl ActivationStore for SqliteStore { } fn assign_partitions(&self, partitions: Vec) -> Result<(), Error> { - warn!("assign_partitions: {:?}", partitions); + // sqlite owns its whole DB regardless of partition assignment, so this + // is a no-op. Fires once per consumer, hence debug rather than warn. + debug!("assign_partitions: {:?}", partitions); Ok(()) } diff --git a/src/test_utils.rs b/src/test_utils.rs index 31455a0f..0d13c4c8 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -354,12 +354,12 @@ pub fn create_producer(config: Arc) -> Arc { /// Reset a kafka topic by destroying it and recreating it. pub async fn reset_topic(config: Arc) { + let (main_topic, _) = config.consumable_topics().expect("no consumable topic")[0]; let admin_client: AdminClient<_> = config - .kafka_consumer_config() + .kafka_consumer_config_for(main_topic) .create() .expect("Could not create admin client"); - let (main_topic, _) = config.consumable_topic().expect("no consumable topic"); let options = AdminOptions::default(); admin_client .delete_topics(&[main_topic, &config.kafka_deadletter_topic], &options) @@ -385,7 +385,7 @@ pub async fn consume_topic( num_records: usize, ) -> Vec { let consumer: StreamConsumer = config - .kafka_consumer_config() + .kafka_consumer_config_for(topic) .create() .expect("could not create consumer"); consumer.subscribe(&[topic]).expect("could not subscribe"); diff --git a/src/upkeep.rs b/src/upkeep.rs index e407cd10..d2a069f3 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -155,13 +155,7 @@ pub async fn do_upkeep( // 1. Handle retry tasks let handle_retries_start = Instant::now(); if let Ok(retries) = store.get_retry_activations().await { - // Use retry topic if configured, otherwise fall back to main topic - let (main_topic, _) = config.consumable_topic().expect("no consumable topic"); - let main_topic_owned = main_topic.to_owned(); - let retry_target_topic = config - .kafka_retry_topic - .as_ref() - .unwrap_or(&main_topic_owned); + let retry_target_topic = config.retry_topic().to_owned(); // 2. Append retries to kafka let deliveries = retries @@ -322,23 +316,32 @@ pub async fn do_upkeep( // 12. Forward tasks from demoted namespaces to `runtime_config.demoted_topic` let demoted_namespaces = runtime_config.demoted_namespaces.clone(); - let (main_topic, main_topic_config) = config.consumable_topic().expect("no consumable topic"); - let main_cluster = config - .cluster(&main_topic_config.cluster) - .expect("cluster not found") - .address - .clone(); + let consumable = config.consumable_topics().expect("no consumable topic"); + // The forward producer authenticates against the deadletter cluster, so + // default demoted forwarding there too (and consistently with the + // consumer-side batcher) when no demoted_topic_cluster is configured. For + // legacy configs where kafka_deadletter_cluster is unset the deadletter + // cluster address defaults to the consumed cluster's address, so this is + // unchanged there; it only differs when a distinct deadletter cluster is set. let forward_cluster = runtime_config .demoted_topic_cluster .clone() - .unwrap_or(main_cluster.clone()); + .unwrap_or_else(|| config.kafka_producer_cluster().address.clone()); let forward_topic = runtime_config .demoted_topic .clone() .unwrap_or(config.kafka_long_topic.clone()); - let same_cluster = forward_cluster == main_cluster; - let same_topic = forward_topic == main_topic; - if !(demoted_namespaces.is_empty() || (same_cluster && same_topic)) { + // Forwarding to a topic+cluster this broker itself consumes would loop the + // tasks straight back in, so treat that as a no-op. For a single consumed + // topic this is exactly the old `same_topic && same_cluster` guard. + let forwards_to_consumed = consumable.iter().any(|(name, topic_config)| { + *name == forward_topic + && config + .cluster(&topic_config.cluster) + .map(|c| c.address == forward_cluster) + .unwrap_or(false) + }); + if !(demoted_namespaces.is_empty() || forwards_to_consumed) { let forward_demoted_start = Instant::now(); // The forwarding producer reuses the deadletter cluster's credentials // (see Config::kafka_producer_config) and only overrides @@ -765,7 +768,7 @@ mod tests { assert_eq!(store.count().await.unwrap(), 1); assert_eq!(result_context.retried, 1); - let (main_topic, _) = config.consumable_topic().unwrap(); + let (main_topic, _) = config.consumable_topics().unwrap()[0]; let messages = consume_topic(config.clone(), main_topic, 1).await; assert_eq!(messages.len(), 1); let activation = &messages[0];