From e0c6fcc62c6cc6a0bde58649191b1588652d3f1b Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 11 May 2026 16:01:57 +0200 Subject: [PATCH 01/10] feat(taskbroker): Implement retry support for raw topics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add retry support for raw/passthrough topics (e.g. `ingest-events`) where tasks don't have retry_state embedded in the message. Changes: - Config: Add `kafka_retry_topic` option for dedicated retry topic - Store: Add `update_retry_state` method to update activation's retry_state - gRPC: Handle `max_retries` in SetTaskStatus, call store.update_retry_state - Upkeep: Route retries to dedicated retry topic when configured - Consumer: Subscribe to both main and retry topics - Deserializer: Topic-aware routing (retry topic always uses activation deserializer) - Python client: Extract max_retries from Retry config, send in SetTaskStatusRequest When a worker reports RETRY status with max_retries, the broker updates the activation's retry_state and routes the retry to the dedicated retry topic. This prevents retries from polluting the main topic where other consumers (like SBC) can't parse activations. See https://www.notion.so/3448b10e4b5d80e7a1efee6145d504c2 → Stage 4 Depends on: https://github.com/getsentry/sentry-protos/pull/251 ref STREAM-981 Co-Authored-By: Claude Opus 4.5 --- Cargo.lock | 7 ++- Cargo.toml | 2 +- clients/python/src/taskbroker_client/types.py | 1 + .../src/taskbroker_client/worker/client.py | 2 + .../taskbroker_client/worker/workerchild.py | 4 ++ src/config.rs | 6 +++ src/fetch/tests.rs | 4 ++ src/grpc/server.rs | 10 +++++ src/grpc/server_tests.rs | 6 +++ src/kafka/deserialize.rs | 14 ++++++ src/main.rs | 22 +++++++++- src/push/tests.rs | 3 ++ src/store/adapters/postgres.rs | 43 ++++++++++++++++++- src/store/adapters/sqlite.rs | 41 +++++++++++++++++- src/store/traits.rs | 4 ++ src/upkeep.rs | 10 ++++- 16 files changed, 168 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 387eb531..0f5dd020 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1459,7 +1459,7 @@ checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -2677,9 +2677,8 @@ dependencies = [ [[package]] name = "sentry_protos" -version = "0.8.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60dfb8c1b03c3f6e800a91eca7daea05205dd87f63b8d70b50b7e2211a2e0be2" +version = "0.8.29" +source = "git+https://github.com/getsentry/sentry-protos?branch=feat%2Ftaskbroker-max-retries#fb9fadfa88bf6ea3eff26b7d5b068b4e6c3869f1" dependencies = [ "prost", "prost-types", diff --git a/Cargo.toml b/Cargo.toml index 6e618f47..4ad45bfb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,7 @@ sentry = { version = "0.41.0", default-features = false, features = [ "tracing", "logs" ] } -sentry_protos = "0.8.13" +sentry_protos = { git = "https://github.com/getsentry/sentry-protos", branch = "feat/taskbroker-max-retries" } serde = "1.0.214" serde_bytes = "0.11" serde_yaml = "0.9.34" diff --git a/clients/python/src/taskbroker_client/types.py b/clients/python/src/taskbroker_client/types.py index 326ddd96..c0acdba4 100644 --- a/clients/python/src/taskbroker_client/types.py +++ b/clients/python/src/taskbroker_client/types.py @@ -68,3 +68,4 @@ class ProcessingResult: status: TaskActivationStatus.ValueType host: str receive_timestamp: float + max_retries: int | None = None diff --git a/clients/python/src/taskbroker_client/worker/client.py b/clients/python/src/taskbroker_client/worker/client.py index 2a1c56b2..125c31cd 100644 --- a/clients/python/src/taskbroker_client/worker/client.py +++ b/clients/python/src/taskbroker_client/worker/client.py @@ -445,6 +445,7 @@ def update_task( id=processing_result.task_id, status=processing_result.status, fetch_next_task=fetch_next_task, + max_retries=processing_result.max_retries, # type: ignore[call-arg] ) try: @@ -566,6 +567,7 @@ def update_task( id=processing_result.task_id, status=processing_result.status, fetch_next_task=None, + max_retries=processing_result.max_retries, # type: ignore[call-arg] ) retries = 0 diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index fa2e8476..72e77a7e 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -234,6 +234,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: set_current_task(inflight.activation) next_state = TASK_ACTIVATION_STATUS_FAILURE + max_retries_val: int | None = None # Use time.time() so we can measure against activation.received_at execution_start_time = time.time() try: @@ -261,6 +262,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: retry = task_func.retry if retry and retry.should_retry(inflight.activation.retry_state, err): next_state = TASK_ACTIVATION_STATUS_RETRY + max_retries_val = retry._times else: next_state = TASK_ACTIVATION_STATUS_FAILURE except Exception as err: @@ -279,6 +281,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: }, ) next_state = TASK_ACTIVATION_STATUS_RETRY + max_retries_val = retry._times elif retry.max_attempts_reached(inflight.activation.retry_state): with sentry_sdk.isolation_scope() as scope: if should_capture_error: @@ -321,6 +324,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: status=next_state, host=inflight.host, receive_timestamp=inflight.receive_timestamp, + max_retries=max_retries_val, ) ) diff --git a/src/config.rs b/src/config.rs index 700f1a5a..f15ace99 100644 --- a/src/config.rs +++ b/src/config.rs @@ -129,6 +129,11 @@ pub struct Config { /// The location to the DLQ private key file pub kafka_deadletter_ssl_key_location: Option, + /// The topic to publish retry task activations to. + /// When set, retries go to this topic instead of kafka_topic. + /// Required for raw_mode where the main topic has other consumers. + pub kafka_retry_topic: Option, + /// The default number of partitions for a topic pub default_topic_partitions: i32, @@ -362,6 +367,7 @@ impl Default for Config { kafka_deadletter_ssl_ca_location: None, kafka_deadletter_ssl_certificate_location: None, kafka_deadletter_ssl_key_location: None, + kafka_retry_topic: None, default_topic_partitions: 1, kafka_session_timeout_ms: 6000, kafka_auto_commit_interval_ms: 5000, diff --git a/src/fetch/tests.rs b/src/fetch/tests.rs index 0d11da63..6260cba7 100644 --- a/src/fetch/tests.rs +++ b/src/fetch/tests.rs @@ -122,6 +122,10 @@ impl InflightActivationStore for MockStore { unimplemented!() } + async fn update_retry_state(&self, _id: &str, _max_retries: u32) -> Result<(), Error> { + unimplemented!() + } + async fn set_processing_deadline( &self, _id: &str, diff --git a/src/grpc/server.rs b/src/grpc/server.rs index cf4a6bdd..379f5577 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -101,6 +101,16 @@ impl ConsumerService for TaskbrokerServer { metrics::counter!("grpc_server.set_status.failure").increment(1); } + // If status is Retry and max_retries is provided, update the activation's retry_state. + // This allows workers to communicate retry policy for tasks from raw topics. + if status == InflightActivationStatus::Retry { + if let Some(max_retries) = request.get_ref().max_retries { + if let Err(e) = self.store.update_retry_state(&id, max_retries).await { + error!(?id, ?max_retries, "Failed to update retry state: {:?}", e); + } + } + } + match self.store.set_status(&id, status).await { Ok(Some(_)) => metrics::counter!( "grpc_server.set_status", diff --git a/src/grpc/server_tests.rs b/src/grpc/server_tests.rs index 2b986d66..4098a3f4 100644 --- a/src/grpc/server_tests.rs +++ b/src/grpc/server_tests.rs @@ -68,6 +68,7 @@ async fn test_set_task_status(#[case] adapter: &str) { id: "test_task".to_string(), status: 5, // Complete fetch_next_task: None, + max_retries: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); @@ -89,6 +90,7 @@ async fn test_set_task_status_invalid(#[case] adapter: &str) { id: "test_task".to_string(), status: 1, // Invalid fetch_next_task: None, + max_retries: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_err()); @@ -221,6 +223,7 @@ async fn test_set_task_status_success(#[case] adapter: &str) { namespace: None, application: None, }), + max_retries: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); @@ -256,6 +259,7 @@ async fn test_set_task_status_with_application(#[case] adapter: &str) { application: Some("hammers".into()), namespace: None, }), + max_retries: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); @@ -296,6 +300,7 @@ async fn test_set_task_status_with_application_no_match(#[case] adapter: &str) { application: Some("no-matches".into()), namespace: None, }), + max_retries: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); @@ -324,6 +329,7 @@ async fn test_set_task_status_with_namespace_requires_application(#[case] adapte application: None, namespace: Some(namespace), }), + max_retries: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); diff --git a/src/kafka/deserialize.rs b/src/kafka/deserialize.rs index 0a6d0919..3679b1fd 100644 --- a/src/kafka/deserialize.rs +++ b/src/kafka/deserialize.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use anyhow::Error; +use rdkafka::Message; use rdkafka::message::OwnedMessage; use crate::config::Config; @@ -12,6 +13,8 @@ use super::deserialize_raw::{self, RawConfig}; pub struct DeserializeConfig { activation_config: DeserializeActivationConfig, raw_config: Option, + /// Retry topic always contains activations, even in raw_mode. + retry_topic: Option, } impl DeserializeConfig { @@ -19,6 +22,7 @@ impl DeserializeConfig { Self { activation_config: DeserializeActivationConfig::from_config(config), raw_config: RawConfig::from_config(config), + retry_topic: config.kafka_retry_topic.clone(), } } } @@ -26,13 +30,23 @@ impl DeserializeConfig { /// Create a unified deserializer that handles both normal and raw modes. /// In raw mode, raw Kafka bytes are wrapped into a TaskActivation. /// In normal mode, Kafka messages are expected to contain encoded TaskActivation protos. +/// Messages from the retry topic are always deserialized as activations. pub fn new( config: DeserializeConfig, ) -> impl Fn(Arc) -> Result { let raw_deserializer = config.raw_config.map(deserialize_raw::new); let activation_deserializer = deserialize_activation::new(config.activation_config); + let retry_topic = config.retry_topic; move |msg: Arc| { + // Messages from the retry topic are always activations + if let Some(ref retry_topic) = retry_topic { + if msg.topic() == retry_topic { + return activation_deserializer(msg); + } + } + + // For main topic: use raw deserializer in raw_mode, else activation deserializer if let Some(ref raw_deserializer) = raw_deserializer { raw_deserializer(msg) } else { diff --git a/src/main.rs b/src/main.rs index c91a766f..f907f726 100644 --- a/src/main.rs +++ b/src/main.rs @@ -86,11 +86,21 @@ async fn main() -> Result<(), Error> { if config.create_missing_topics { let kafka_client_config = config.kafka_consumer_config(); create_missing_topics( - kafka_client_config, + kafka_client_config.clone(), &config.kafka_topic, config.default_topic_partitions, ) .await?; + + // Create retry topic if configured + if let Some(ref retry_topic) = config.kafka_retry_topic { + create_missing_topics( + kafka_client_config, + retry_topic, + config.default_topic_partitions, + ) + .await?; + } } if config.full_vacuum_on_start { @@ -158,11 +168,19 @@ async fn main() -> Result<(), Error> { let consumer_store = store.clone(); let consumer_config = config.clone(); let runtime_config_manager = runtime_config_manager.clone(); + + // Build list of topics to consume from + let mut topics_to_consume = vec![consumer_config.kafka_topic.clone()]; + if let Some(ref retry_topic) = consumer_config.kafka_retry_topic { + topics_to_consume.push(retry_topic.clone()); + } + async move { // The consumer has an internal thread that listens for cancellations, so it doesn't need // an outer select here like the other tasks. + let topic_refs: Vec<&str> = topics_to_consume.iter().map(|s| s.as_str()).collect(); start_consumer( - &[&consumer_config.kafka_topic], + &topic_refs, &consumer_config.kafka_consumer_config(), consumer_store.clone(), processing_strategy!({ diff --git a/src/push/tests.rs b/src/push/tests.rs index ca479a6d..3349bdee 100644 --- a/src/push/tests.rs +++ b/src/push/tests.rs @@ -103,6 +103,9 @@ impl InflightActivationStore for MockStore { ) -> anyhow::Result> { Ok(None) } + async fn update_retry_state(&self, _id: &str, _max_retries: u32) -> anyhow::Result<()> { + Ok(()) + } async fn pending_activation_max_lag(&self, _now: &DateTime) -> f64 { 0.0 } diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 8f087b59..4590a800 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -11,7 +11,8 @@ use anyhow::{Error, anyhow}; use async_backtrace::framed; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use sentry_protos::taskbroker::v1::OnAttemptsExceeded; +use prost::Message; +use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, RetryState, TaskActivation}; use tracing::{instrument, warn}; use crate::config::Config; @@ -654,6 +655,46 @@ impl InflightActivationStore for PostgresActivationStore { Ok(Some(row.into())) } + #[instrument(skip_all)] + #[framed] + async fn update_retry_state(&self, id: &str, max_retries: u32) -> Result<(), Error> { + let mut conn = self.acquire_write_conn_metric("update_retry_state").await?; + + // Fetch the current activation + let row: Option = sqlx::query_as( + "SELECT *, kafka_offset AS offset FROM inflight_taskactivations WHERE id = $1", + ) + .bind(id) + .fetch_optional(&mut *conn) + .await?; + + let Some(row) = row else { + return Ok(()); + }; + + // Decode the activation, update retry_state, re-encode + let mut activation = TaskActivation::decode(&row.activation as &[u8])?; + + let retry_state = activation.retry_state.get_or_insert_with(|| RetryState { + attempts: 0, + max_attempts: 0, + on_attempts_exceeded: OnAttemptsExceeded::Discard.into(), + delay_on_retry: None, + at_most_once: Some(false), + }); + retry_state.max_attempts = max_retries; + + let updated_bytes = activation.encode_to_vec(); + + sqlx::query("UPDATE inflight_taskactivations SET activation = $1 WHERE id = $2") + .bind(updated_bytes) + .bind(id) + .execute(&mut *conn) + .await?; + + Ok(()) + } + #[instrument(skip_all)] #[framed] async fn set_processing_deadline( diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index 8692ac0c..9bce8367 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -20,7 +20,8 @@ use libsqlite3_sys::{ SQLITE_DBSTATUS_LOOKASIDE_USED, SQLITE_DBSTATUS_SCHEMA_USED, SQLITE_DBSTATUS_STMT_USED, SQLITE_OK, sqlite3_db_status, }; -use sentry_protos::taskbroker::v1::OnAttemptsExceeded; +use prost::Message; +use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, RetryState, TaskActivation}; use tracing::{instrument, warn}; use crate::config::Config; @@ -706,6 +707,44 @@ impl InflightActivationStore for SqliteActivationStore { Ok(Some(row.into())) } + #[instrument(skip_all)] + async fn update_retry_state(&self, id: &str, max_retries: u32) -> Result<(), Error> { + let mut conn = self.acquire_write_conn_metric("update_retry_state").await?; + + // Fetch the current activation + let row: Option = + sqlx::query_as("SELECT * FROM inflight_taskactivations WHERE id = $1") + .bind(id) + .fetch_optional(&mut *conn) + .await?; + + let Some(row) = row else { + return Ok(()); + }; + + // Decode the activation, update retry_state, re-encode + let mut activation = TaskActivation::decode(&row.activation as &[u8])?; + + let retry_state = activation.retry_state.get_or_insert_with(|| RetryState { + attempts: 0, + max_attempts: 0, + on_attempts_exceeded: OnAttemptsExceeded::Discard.into(), + delay_on_retry: None, + at_most_once: Some(false), + }); + retry_state.max_attempts = max_retries; + + let updated_bytes = activation.encode_to_vec(); + + sqlx::query("UPDATE inflight_taskactivations SET activation = $1 WHERE id = $2") + .bind(updated_bytes) + .bind(id) + .execute(&mut *conn) + .await?; + + Ok(()) + } + #[instrument(skip_all)] async fn set_processing_deadline( &self, diff --git a/src/store/traits.rs b/src/store/traits.rs index c21a9d41..1f4a0e97 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -78,6 +78,10 @@ pub trait InflightActivationStore: Send + Sync { status: InflightActivationStatus, ) -> Result, Error>; + /// Update the retry_state in the activation blob. + /// Called when worker provides max_retries with a Retry status. + async fn update_retry_state(&self, id: &str, max_retries: u32) -> Result<(), Error>; + /// COUNT OPERATIONS /// Get the age of the oldest pending activation in seconds async fn pending_activation_max_lag(&self, now: &DateTime) -> f64; diff --git a/src/upkeep.rs b/src/upkeep.rs index 5336f063..23c8f00d 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -151,20 +151,26 @@ pub async fn do_upkeep( // 1. Handle retry tasks let handle_retries_start = Instant::now(); if let Ok(retries) = store.get_retry_activations().await { + // Use retry topic if configured, otherwise fall back to main topic + let retry_target_topic = config + .kafka_retry_topic + .as_ref() + .unwrap_or(&config.kafka_topic); + // 2. Append retries to kafka let deliveries = retries .into_iter() .map(|inflight| { let producer = producer.clone(); let config = config.clone(); + let target_topic = retry_target_topic.clone(); async move { let activation = TaskActivation::decode(&inflight.activation as &[u8]).unwrap(); let serialized = create_retry_activation(&activation).encode_to_vec(); let delivery = producer .send( - FutureRecord::<(), Vec>::to(&config.kafka_topic) - .payload(&serialized), + FutureRecord::<(), Vec>::to(&target_topic).payload(&serialized), Timeout::After(Duration::from_millis(config.kafka_send_timeout_ms)), ) .await; From 95413d00cb2e8fd88c279cf0116a2fefdd11481a Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 11 May 2026 16:08:01 +0200 Subject: [PATCH 02/10] fix: Convert max_retries to max_attempts correctly max_retries (from Python's @task decorator) excludes the initial attempt, while max_attempts includes it. Add 1 when storing to retry_state. Example: @task(max_retries=3) means 4 total attempts (1 initial + 3 retries) Co-Authored-By: Claude Opus 4.5 --- src/store/adapters/postgres.rs | 3 ++- src/store/adapters/sqlite.rs | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 4590a800..6c39428d 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -682,7 +682,8 @@ impl InflightActivationStore for PostgresActivationStore { delay_on_retry: None, at_most_once: Some(false), }); - retry_state.max_attempts = max_retries; + // max_retries excludes initial attempt, max_attempts includes it + retry_state.max_attempts = max_retries + 1; let updated_bytes = activation.encode_to_vec(); diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index 9bce8367..fd50e7f5 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -732,7 +732,8 @@ impl InflightActivationStore for SqliteActivationStore { delay_on_retry: None, at_most_once: Some(false), }); - retry_state.max_attempts = max_retries; + // max_retries excludes initial attempt, max_attempts includes it + retry_state.max_attempts = max_retries + 1; let updated_bytes = activation.encode_to_vec(); From 5354ee766179eef4e52abe204de05624d66f03cc Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 18 May 2026 16:33:27 +0200 Subject: [PATCH 03/10] update sentry-protos, remove additional queries --- Cargo.lock | 7 +- Cargo.toml | 2 +- benches/store_bench.rs | 6 +- clients/python/pyproject.toml | 2 +- clients/python/src/taskbroker_client/types.py | 2 +- .../src/taskbroker_client/worker/client.py | 4 +- .../taskbroker_client/worker/workerchild.py | 7 +- src/fetch/tests.rs | 5 +- src/grpc/server.rs | 18 +++-- src/grpc/server_tests.rs | 12 ++-- src/push/tests.rs | 4 +- src/store/adapters/postgres.rs | 72 ++++++++----------- src/store/adapters/sqlite.rs | 70 ++++++++---------- src/store/tests.rs | 34 ++++----- src/store/traits.rs | 8 +-- 15 files changed, 111 insertions(+), 142 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0f5dd020..e8d9be9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1459,7 +1459,7 @@ checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2677,8 +2677,9 @@ dependencies = [ [[package]] name = "sentry_protos" -version = "0.8.29" -source = "git+https://github.com/getsentry/sentry-protos?branch=feat%2Ftaskbroker-max-retries#fb9fadfa88bf6ea3eff26b7d5b068b4e6c3869f1" +version = "0.8.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2cd6296e52dcf5924b93fc991863571eb9f1fd14267c737c0311a5743914f69" dependencies = [ "prost", "prost-types", diff --git a/Cargo.toml b/Cargo.toml index 4ad45bfb..afa3f334 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,7 @@ sentry = { version = "0.41.0", default-features = false, features = [ "tracing", "logs" ] } -sentry_protos = { git = "https://github.com/getsentry/sentry-protos", branch = "feat/taskbroker-max-retries" } +sentry_protos = "0.8.32" serde = "1.0.214" serde_bytes = "0.11" serde_yaml = "0.9.34" diff --git a/benches/store_bench.rs b/benches/store_bench.rs index 3c72d7e8..fde401e2 100644 --- a/benches/store_bench.rs +++ b/benches/store_bench.rs @@ -121,7 +121,11 @@ async fn set_status(num_activations: u32, num_workers: u32) { for task_id in 0..num_activations { if task_id % num_workers == worker_idx { store - .set_status(&format!("id_{task_id}"), InflightActivationStatus::Complete) + .set_status( + &format!("id_{task_id}"), + InflightActivationStatus::Complete, + None, + ) .await .unwrap(); } diff --git a/clients/python/pyproject.toml b/clients/python/pyproject.toml index ac455616..b4ad60ca 100644 --- a/clients/python/pyproject.toml +++ b/clients/python/pyproject.toml @@ -6,7 +6,7 @@ readme = "README.md" dependencies = [ "sentry-arroyo>=2.38.7", "sentry-sdk[http2]>=2.43.0", - "sentry-protos>=0.8.13", + "sentry-protos>=0.8.32", "confluent_kafka>=2.3.0", "cronsim>=2.6", "grpcio>=1.67.0", diff --git a/clients/python/src/taskbroker_client/types.py b/clients/python/src/taskbroker_client/types.py index c0acdba4..50727d08 100644 --- a/clients/python/src/taskbroker_client/types.py +++ b/clients/python/src/taskbroker_client/types.py @@ -68,4 +68,4 @@ class ProcessingResult: status: TaskActivationStatus.ValueType host: str receive_timestamp: float - max_retries: int | None = None + max_attempts: int | None = None diff --git a/clients/python/src/taskbroker_client/worker/client.py b/clients/python/src/taskbroker_client/worker/client.py index 125c31cd..01cc3788 100644 --- a/clients/python/src/taskbroker_client/worker/client.py +++ b/clients/python/src/taskbroker_client/worker/client.py @@ -445,7 +445,7 @@ def update_task( id=processing_result.task_id, status=processing_result.status, fetch_next_task=fetch_next_task, - max_retries=processing_result.max_retries, # type: ignore[call-arg] + max_attempts=processing_result.max_attempts, # type: ignore[call-arg] ) try: @@ -567,7 +567,7 @@ def update_task( id=processing_result.task_id, status=processing_result.status, fetch_next_task=None, - max_retries=processing_result.max_retries, # type: ignore[call-arg] + max_attempts=processing_result.max_attempts, # type: ignore[call-arg] ) retries = 0 diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index 72e77a7e..d2811e5d 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -234,7 +234,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: set_current_task(inflight.activation) next_state = TASK_ACTIVATION_STATUS_FAILURE - max_retries_val: int | None = None + max_attempts_val: int | None = None # Use time.time() so we can measure against activation.received_at execution_start_time = time.time() try: @@ -262,7 +262,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: retry = task_func.retry if retry and retry.should_retry(inflight.activation.retry_state, err): next_state = TASK_ACTIVATION_STATUS_RETRY - max_retries_val = retry._times + max_attempts_val = retry._times + 1 else: next_state = TASK_ACTIVATION_STATUS_FAILURE except Exception as err: @@ -281,7 +281,6 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: }, ) next_state = TASK_ACTIVATION_STATUS_RETRY - max_retries_val = retry._times elif retry.max_attempts_reached(inflight.activation.retry_state): with sentry_sdk.isolation_scope() as scope: if should_capture_error: @@ -324,7 +323,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: status=next_state, host=inflight.host, receive_timestamp=inflight.receive_timestamp, - max_retries=max_retries_val, + max_attempts=max_attempts_val, ) ) diff --git a/src/fetch/tests.rs b/src/fetch/tests.rs index 6260cba7..3fd548cb 100644 --- a/src/fetch/tests.rs +++ b/src/fetch/tests.rs @@ -118,14 +118,11 @@ impl InflightActivationStore for MockStore { &self, _id: &str, _status: InflightActivationStatus, + _max_attempts: Option, ) -> Result, Error> { unimplemented!() } - async fn update_retry_state(&self, _id: &str, _max_retries: u32) -> Result<(), Error> { - unimplemented!() - } - async fn set_processing_deadline( &self, _id: &str, diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 379f5577..207d464d 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -101,17 +101,15 @@ impl ConsumerService for TaskbrokerServer { metrics::counter!("grpc_server.set_status.failure").increment(1); } - // If status is Retry and max_retries is provided, update the activation's retry_state. - // This allows workers to communicate retry policy for tasks from raw topics. - if status == InflightActivationStatus::Retry { - if let Some(max_retries) = request.get_ref().max_retries { - if let Err(e) = self.store.update_retry_state(&id, max_retries).await { - error!(?id, ?max_retries, "Failed to update retry state: {:?}", e); - } - } - } + // If status is Retry and max_attempts is provided, pass it to set_status + // to update the activation's retry_state in the same DB operation. + let max_attempts = if status == InflightActivationStatus::Retry { + request.get_ref().max_attempts + } else { + None + }; - match self.store.set_status(&id, status).await { + match self.store.set_status(&id, status, max_attempts).await { Ok(Some(_)) => metrics::counter!( "grpc_server.set_status", "result" => "ok", diff --git a/src/grpc/server_tests.rs b/src/grpc/server_tests.rs index 4098a3f4..30664adc 100644 --- a/src/grpc/server_tests.rs +++ b/src/grpc/server_tests.rs @@ -68,7 +68,7 @@ async fn test_set_task_status(#[case] adapter: &str) { id: "test_task".to_string(), status: 5, // Complete fetch_next_task: None, - max_retries: None, + max_attempts: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); @@ -90,7 +90,7 @@ async fn test_set_task_status_invalid(#[case] adapter: &str) { id: "test_task".to_string(), status: 1, // Invalid fetch_next_task: None, - max_retries: None, + max_attempts: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_err()); @@ -223,7 +223,7 @@ async fn test_set_task_status_success(#[case] adapter: &str) { namespace: None, application: None, }), - max_retries: None, + max_attempts: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); @@ -259,7 +259,7 @@ async fn test_set_task_status_with_application(#[case] adapter: &str) { application: Some("hammers".into()), namespace: None, }), - max_retries: None, + max_attempts: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); @@ -300,7 +300,7 @@ async fn test_set_task_status_with_application_no_match(#[case] adapter: &str) { application: Some("no-matches".into()), namespace: None, }), - max_retries: None, + max_attempts: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); @@ -329,7 +329,7 @@ async fn test_set_task_status_with_namespace_requires_application(#[case] adapte application: None, namespace: Some(namespace), }), - max_retries: None, + max_attempts: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); diff --git a/src/push/tests.rs b/src/push/tests.rs index 3349bdee..cb75f88f 100644 --- a/src/push/tests.rs +++ b/src/push/tests.rs @@ -100,12 +100,10 @@ impl InflightActivationStore for MockStore { &self, _id: &str, _status: InflightActivationStatus, + _max_attempts: Option, ) -> anyhow::Result> { Ok(None) } - async fn update_retry_state(&self, _id: &str, _max_retries: u32) -> anyhow::Result<()> { - Ok(()) - } async fn pending_activation_max_lag(&self, _now: &DateTime) -> f64 { 0.0 } diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 6c39428d..432208af 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -12,7 +12,7 @@ use async_backtrace::framed; use async_trait::async_trait; use chrono::{DateTime, Utc}; use prost::Message; -use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, RetryState, TaskActivation}; +use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, TaskActivation}; use tracing::{instrument, warn}; use crate::config::Config; @@ -631,15 +631,18 @@ impl InflightActivationStore for PostgresActivationStore { }) } - /// Update the status of a specific activation + /// Update the status of a specific activation. + /// If max_attempts is provided (for Retry status), also updates the activation's retry_state. #[instrument(skip_all)] #[framed] async fn set_status( &self, id: &str, status: InflightActivationStatus, + max_attempts: Option, ) -> Result, Error> { let mut conn = self.acquire_write_conn_metric("set_status").await?; + let result: Option = sqlx::query_as( "UPDATE inflight_taskactivations SET status = $1 WHERE id = $2 RETURNING *, kafka_offset AS offset", ) @@ -652,48 +655,31 @@ impl InflightActivationStore for PostgresActivationStore { return Ok(None); }; - Ok(Some(row.into())) - } - - #[instrument(skip_all)] - #[framed] - async fn update_retry_state(&self, id: &str, max_retries: u32) -> Result<(), Error> { - let mut conn = self.acquire_write_conn_metric("update_retry_state").await?; - - // Fetch the current activation - let row: Option = sqlx::query_as( - "SELECT *, kafka_offset AS offset FROM inflight_taskactivations WHERE id = $1", - ) - .bind(id) - .fetch_optional(&mut *conn) - .await?; - - let Some(row) = row else { - return Ok(()); - }; - - // Decode the activation, update retry_state, re-encode - let mut activation = TaskActivation::decode(&row.activation as &[u8])?; - - let retry_state = activation.retry_state.get_or_insert_with(|| RetryState { - attempts: 0, - max_attempts: 0, - on_attempts_exceeded: OnAttemptsExceeded::Discard.into(), - delay_on_retry: None, - at_most_once: Some(false), - }); - // max_retries excludes initial attempt, max_attempts includes it - retry_state.max_attempts = max_retries + 1; - - let updated_bytes = activation.encode_to_vec(); - - sqlx::query("UPDATE inflight_taskactivations SET activation = $1 WHERE id = $2") - .bind(updated_bytes) - .bind(id) - .execute(&mut *conn) - .await?; + if let Some(max_attempts) = max_attempts { + let mut activation = TaskActivation::decode(&row.activation as &[u8])?; + + // Only update the blob if max_attempts actually changed. This should rarely + // happen after the first retry, since max_attempts comes from the task's + // retry decorator which stays constant across retries. + // For raw topics, retry_state starts as None so we create it on first retry. + let needs_update = activation + .retry_state + .as_ref() + .is_none_or(|rs| rs.max_attempts != max_attempts); + + if needs_update { + let retry_state = activation.retry_state.get_or_insert_default(); + retry_state.max_attempts = max_attempts; + + sqlx::query("UPDATE inflight_taskactivations SET activation = $1 WHERE id = $2") + .bind(activation.encode_to_vec()) + .bind(id) + .execute(&mut *conn) + .await?; + } + } - Ok(()) + Ok(Some(row.into())) } #[instrument(skip_all)] diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index fd50e7f5..c8996be5 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -21,7 +21,7 @@ use libsqlite3_sys::{ SQLITE_OK, sqlite3_db_status, }; use prost::Message; -use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, RetryState, TaskActivation}; +use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, TaskActivation}; use tracing::{instrument, warn}; use crate::config::Config; @@ -684,14 +684,17 @@ impl InflightActivationStore for SqliteActivationStore { Ok(result.get::("count") as usize) } - /// Update the status of a specific activation + /// Update the status of a specific activation. + /// If max_attempts is provided (for Retry status), also updates the activation's retry_state. #[instrument(skip_all)] async fn set_status( &self, id: &str, status: InflightActivationStatus, + max_attempts: Option, ) -> Result, Error> { let mut conn = self.acquire_write_conn_metric("set_status").await?; + let result: Option = sqlx::query_as( "UPDATE inflight_taskactivations SET status = $1 WHERE id = $2 RETURNING *", ) @@ -704,46 +707,31 @@ impl InflightActivationStore for SqliteActivationStore { return Ok(None); }; - Ok(Some(row.into())) - } - - #[instrument(skip_all)] - async fn update_retry_state(&self, id: &str, max_retries: u32) -> Result<(), Error> { - let mut conn = self.acquire_write_conn_metric("update_retry_state").await?; - - // Fetch the current activation - let row: Option = - sqlx::query_as("SELECT * FROM inflight_taskactivations WHERE id = $1") - .bind(id) - .fetch_optional(&mut *conn) - .await?; - - let Some(row) = row else { - return Ok(()); - }; - - // Decode the activation, update retry_state, re-encode - let mut activation = TaskActivation::decode(&row.activation as &[u8])?; - - let retry_state = activation.retry_state.get_or_insert_with(|| RetryState { - attempts: 0, - max_attempts: 0, - on_attempts_exceeded: OnAttemptsExceeded::Discard.into(), - delay_on_retry: None, - at_most_once: Some(false), - }); - // max_retries excludes initial attempt, max_attempts includes it - retry_state.max_attempts = max_retries + 1; - - let updated_bytes = activation.encode_to_vec(); - - sqlx::query("UPDATE inflight_taskactivations SET activation = $1 WHERE id = $2") - .bind(updated_bytes) - .bind(id) - .execute(&mut *conn) - .await?; + if let Some(max_attempts) = max_attempts { + let mut activation = TaskActivation::decode(&row.activation as &[u8])?; + + // Only update the blob if max_attempts actually changed. This should rarely + // happen after the first retry, since max_attempts comes from the task's + // retry decorator which stays constant across retries. + // For raw topics, retry_state starts as None so we create it on first retry. + let needs_update = activation + .retry_state + .as_ref() + .is_none_or(|rs| rs.max_attempts != max_attempts); + + if needs_update { + let retry_state = activation.retry_state.get_or_insert_default(); + retry_state.max_attempts = max_attempts; + + sqlx::query("UPDATE inflight_taskactivations SET activation = $1 WHERE id = $2") + .bind(activation.encode_to_vec()) + .bind(id) + .execute(&mut *conn) + .await?; + } + } - Ok(()) + Ok(Some(row.into())) } #[instrument(skip_all)] diff --git a/src/store/tests.rs b/src/store/tests.rs index 18df83aa..ebba53bc 100644 --- a/src/store/tests.rs +++ b/src/store/tests.rs @@ -132,15 +132,15 @@ async fn test_count_depths(#[case] adapter: &str) { assert!(store.store(batch).await.is_ok()); store - .set_status("id_0", InflightActivationStatus::Processing) + .set_status("id_0", InflightActivationStatus::Processing, None) .await .unwrap(); store - .set_status("id_1", InflightActivationStatus::Delay) + .set_status("id_1", InflightActivationStatus::Delay, None) .await .unwrap(); store - .set_status("id_2", InflightActivationStatus::Complete) + .set_status("id_2", InflightActivationStatus::Complete, None) .await .unwrap(); @@ -739,7 +739,7 @@ async fn test_set_activation_status(#[case] adapter: &str) { assert!( store - .set_status("id_0", InflightActivationStatus::Failure) + .set_status("id_0", InflightActivationStatus::Failure, None) .await .is_ok() ); @@ -755,7 +755,7 @@ async fn test_set_activation_status(#[case] adapter: &str) { assert!( store - .set_status("id_0", InflightActivationStatus::Pending) + .set_status("id_0", InflightActivationStatus::Pending, None) .await .is_ok() ); @@ -769,13 +769,13 @@ async fn test_set_activation_status(#[case] adapter: &str) { .await; assert!( store - .set_status("id_0", InflightActivationStatus::Failure) + .set_status("id_0", InflightActivationStatus::Failure, None) .await .is_ok() ); assert!( store - .set_status("id_1", InflightActivationStatus::Failure) + .set_status("id_1", InflightActivationStatus::Failure, None) .await .is_ok() ); @@ -797,7 +797,7 @@ async fn test_set_activation_status(#[case] adapter: &str) { ); let result = store - .set_status("not_there", InflightActivationStatus::Complete) + .set_status("not_there", InflightActivationStatus::Complete, None) .await; assert!(result.is_ok(), "no query error"); @@ -805,7 +805,7 @@ async fn test_set_activation_status(#[case] adapter: &str) { assert!(activation.is_none(), "no activation found"); let result = store - .set_status("id_0", InflightActivationStatus::Complete) + .set_status("id_0", InflightActivationStatus::Complete, None) .await; assert!(result.is_ok(), "no query error"); @@ -837,7 +837,7 @@ async fn test_set_activation_status_with_partitions(#[case] adapter: &str) { assert!( store - .set_status("id_0", InflightActivationStatus::Failure) + .set_status("id_0", InflightActivationStatus::Failure, None) .await .is_ok() ); @@ -852,7 +852,7 @@ async fn test_set_activation_status_with_partitions(#[case] adapter: &str) { assert!( store - .set_status("id_0", InflightActivationStatus::Pending) + .set_status("id_0", InflightActivationStatus::Pending, None) .await .is_ok() ); @@ -866,13 +866,13 @@ async fn test_set_activation_status_with_partitions(#[case] adapter: &str) { .await; assert!( store - .set_status("id_0", InflightActivationStatus::Failure) + .set_status("id_0", InflightActivationStatus::Failure, None) .await .is_ok() ); assert!( store - .set_status("id_1", InflightActivationStatus::Failure) + .set_status("id_1", InflightActivationStatus::Failure, None) .await .is_ok() ); @@ -896,7 +896,7 @@ async fn test_set_activation_status_with_partitions(#[case] adapter: &str) { ); let result = store - .set_status("not_there", InflightActivationStatus::Complete) + .set_status("not_there", InflightActivationStatus::Complete, None) .await; assert!(result.is_ok(), "no query error"); @@ -904,7 +904,7 @@ async fn test_set_activation_status_with_partitions(#[case] adapter: &str) { assert!(activation.is_none(), "no activation found"); let result = store - .set_status("id_0", InflightActivationStatus::Complete) + .set_status("id_0", InflightActivationStatus::Complete, None) .await; assert!(result.is_ok(), "no query error"); @@ -985,7 +985,7 @@ async fn test_get_retry_activations(#[case] adapter: &str) { assert!( store - .set_status("id_0", InflightActivationStatus::Retry) + .set_status("id_0", InflightActivationStatus::Retry, None) .await .is_ok() ); @@ -1001,7 +1001,7 @@ async fn test_get_retry_activations(#[case] adapter: &str) { assert!( store - .set_status("id_1", InflightActivationStatus::Retry) + .set_status("id_1", InflightActivationStatus::Retry, None) .await .is_ok() ); diff --git a/src/store/traits.rs b/src/store/traits.rs index 1f4a0e97..d968c51f 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -71,17 +71,15 @@ pub trait InflightActivationStore: Send + Sync { /// Record successful push. async fn mark_activation_processing(&self, id: &str) -> Result<(), Error>; - /// Update the status of a specific activation + /// Update the status of a specific activation. + /// If max_attempts is provided (for Retry status), also updates the activation's retry_state. async fn set_status( &self, id: &str, status: InflightActivationStatus, + max_attempts: Option, ) -> Result, Error>; - /// Update the retry_state in the activation blob. - /// Called when worker provides max_retries with a Retry status. - async fn update_retry_state(&self, id: &str, max_retries: u32) -> Result<(), Error>; - /// COUNT OPERATIONS /// Get the age of the oldest pending activation in seconds async fn pending_activation_max_lag(&self, now: &DateTime) -> f64; From 4344e648713f44d1ae8a1629c67d75fed5bd9836 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 18 May 2026 16:51:08 +0200 Subject: [PATCH 04/10] style: Collapse nested if statements to fix clippy warnings Co-Authored-By: Claude Opus 4.5 --- src/grpc/server.rs | 16 ++++++++-------- src/kafka/deserialize.rs | 8 ++++---- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 747af568..618ea6fe 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -115,15 +115,15 @@ impl ConsumerService for TaskbrokerServer { }; // Use batching channel if available and we don't need to update retry state - if let Some(ref tx) = self.update_tx { - if max_attempts.is_none() { - tx.send((id, status)) - .await - .map_err(|_| Status::internal("Status update channel closed"))?; + if let Some(ref tx) = self.update_tx + && max_attempts.is_none() + { + tx.send((id, status)) + .await + .map_err(|_| Status::internal("Status update channel closed"))?; - metrics::histogram!("grpc_server.set_status.duration").record(start_time.elapsed()); - return Ok(Response::new(SetTaskStatusResponse { task: None })); - } + metrics::histogram!("grpc_server.set_status.duration").record(start_time.elapsed()); + return Ok(Response::new(SetTaskStatusResponse { task: None })); } match self.store.set_status(&id, status, max_attempts).await { diff --git a/src/kafka/deserialize.rs b/src/kafka/deserialize.rs index 3679b1fd..8f7a73e1 100644 --- a/src/kafka/deserialize.rs +++ b/src/kafka/deserialize.rs @@ -40,10 +40,10 @@ pub fn new( move |msg: Arc| { // Messages from the retry topic are always activations - if let Some(ref retry_topic) = retry_topic { - if msg.topic() == retry_topic { - return activation_deserializer(msg); - } + if let Some(ref retry_topic) = retry_topic + && msg.topic() == retry_topic + { + return activation_deserializer(msg); } // For main topic: use raw deserializer in raw_mode, else activation deserializer From a085bbb51eb2f831bbed4f31a2a454584395706d Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 18 May 2026 17:31:59 +0200 Subject: [PATCH 05/10] better comment --- src/grpc/server.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 618ea6fe..b5668105 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -106,15 +106,11 @@ impl ConsumerService for TaskbrokerServer { metrics::counter!("grpc_server.set_status.failure").increment(1); } - // If status is Retry and max_attempts is provided, pass it to set_status - // to update the activation's retry_state in the same DB operation. - let max_attempts = if status == InflightActivationStatus::Retry { - request.get_ref().max_attempts - } else { - None - }; + let max_attempts = request.get_ref().max_attempts; - // Use batching channel if available and we don't need to update retry state + // Use batching channel if available and we don't need to update retry state. + // If max_attempts is Some, we can't use batching API to update the activation, and have to + // fall back to individual set_status. if let Some(ref tx) = self.update_tx && max_attempts.is_none() { From 4bdfe18e2fa31327d3c83ec7e29a03ba5dc4253c Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 18 May 2026 17:33:37 +0200 Subject: [PATCH 06/10] chore: Remove unnecessary type: ignore comments The sentry-protos stubs now include max_attempts in SetTaskStatusRequest. Co-Authored-By: Claude Opus 4.5 --- clients/python/src/taskbroker_client/worker/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/python/src/taskbroker_client/worker/client.py b/clients/python/src/taskbroker_client/worker/client.py index 01cc3788..1001c2ff 100644 --- a/clients/python/src/taskbroker_client/worker/client.py +++ b/clients/python/src/taskbroker_client/worker/client.py @@ -445,7 +445,7 @@ def update_task( id=processing_result.task_id, status=processing_result.status, fetch_next_task=fetch_next_task, - max_attempts=processing_result.max_attempts, # type: ignore[call-arg] + max_attempts=processing_result.max_attempts, ) try: @@ -567,7 +567,7 @@ def update_task( id=processing_result.task_id, status=processing_result.status, fetch_next_task=None, - max_attempts=processing_result.max_attempts, # type: ignore[call-arg] + max_attempts=processing_result.max_attempts, ) retries = 0 From 2322c199a314577964ef0f7b079021815b659037 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 20 May 2026 16:27:18 +0200 Subject: [PATCH 07/10] open transaction to fix race conditions, and add helper function to instrument transaction start --- src/store/adapters/postgres.rs | 32 +++++++++++++++++++++++++------- src/store/adapters/sqlite.rs | 31 ++++++++++++++++++++++++------- 2 files changed, 49 insertions(+), 14 deletions(-) diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 30947c58..19b80654 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -5,7 +5,7 @@ use std::time::Instant; use sqlx::ConnectOptions; use sqlx::pool::PoolConnection; use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions}; -use sqlx::{FromRow, Pool, Postgres, QueryBuilder}; +use sqlx::{FromRow, Pool, Postgres, QueryBuilder, Transaction}; use anyhow::{Error, anyhow}; use async_backtrace::framed; @@ -184,10 +184,23 @@ impl PostgresActivationStore { ) -> Result, Error> { let start = Instant::now(); let conn = self.write_pool.acquire().await?; - metrics::histogram!("postgres.write.acquire_conn", "fn" => caller).record(start.elapsed()); + metrics::histogram!("postgres.write.acquire_conn", "fn" => caller, "mode" => "conn") + .record(start.elapsed()); Ok(conn) } + #[framed] + async fn begin_write_tx_metric( + &self, + caller: &'static str, + ) -> Result, Error> { + let start = Instant::now(); + let tx = self.write_pool.begin().await?; + metrics::histogram!("postgres.write.acquire_conn", "fn" => caller, "mode" => "begin") + .record(start.elapsed()); + Ok(tx) + } + #[framed] pub async fn new(config: PostgresActivationStoreConfig) -> Result { if config.run_migrations { @@ -641,17 +654,17 @@ impl InflightActivationStore for PostgresActivationStore { status: InflightActivationStatus, max_attempts: Option, ) -> Result, Error> { - let mut conn = self.acquire_write_conn_metric("set_status").await?; + let mut tx = self.begin_write_tx_metric("set_status").await?; let result: Option = sqlx::query_as( "UPDATE inflight_taskactivations SET status = $1 WHERE id = $2 RETURNING *, kafka_offset AS offset", ) .bind(status.to_string()) .bind(id) - .fetch_optional(&mut *conn) + .fetch_optional(&mut *tx) .await?; - let Some(row) = result else { + let Some(mut row) = result else { return Ok(None); }; @@ -671,14 +684,19 @@ impl InflightActivationStore for PostgresActivationStore { let retry_state = activation.retry_state.get_or_insert_default(); retry_state.max_attempts = max_attempts; + let updated_activation = activation.encode_to_vec(); sqlx::query("UPDATE inflight_taskactivations SET activation = $1 WHERE id = $2") - .bind(activation.encode_to_vec()) + .bind(&updated_activation) .bind(id) - .execute(&mut *conn) + .execute(&mut *tx) .await?; + + row.activation = updated_activation; } } + tx.commit().await?; + Ok(Some(row.into())) } diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index da6efa47..267bc13c 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -7,7 +7,7 @@ use sqlx::sqlite::{ SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqlitePool, SqliteRow, SqliteSynchronous, }; -use sqlx::{ConnectOptions, FromRow, Pool, QueryBuilder, Row, Sqlite}; +use sqlx::{ConnectOptions, FromRow, Pool, QueryBuilder, Row, Sqlite, Transaction}; use anyhow::{Error, anyhow}; use async_trait::async_trait; @@ -185,10 +185,22 @@ impl SqliteActivationStore { ) -> Result, Error> { let start = Instant::now(); let conn = self.write_pool.acquire().await?; - metrics::histogram!("sqlite.write.acquire_conn", "fn" => caller).record(start.elapsed()); + metrics::histogram!("sqlite.write.acquire_conn", "fn" => caller, "mode" => "conn") + .record(start.elapsed()); Ok(conn) } + async fn begin_write_tx_metric( + &self, + caller: &'static str, + ) -> Result, Error> { + let start = Instant::now(); + let tx = self.write_pool.begin().await?; + metrics::histogram!("sqlite.write.acquire_conn", "fn" => caller, "mode" => "begin") + .record(start.elapsed()); + Ok(tx) + } + async fn emit_db_status_metrics(&self) { if !self.config.enable_sqlite_status_metrics { return; @@ -693,17 +705,17 @@ impl InflightActivationStore for SqliteActivationStore { status: InflightActivationStatus, max_attempts: Option, ) -> Result, Error> { - let mut conn = self.acquire_write_conn_metric("set_status").await?; + let mut tx = self.begin_write_tx_metric("set_status").await?; let result: Option = sqlx::query_as( "UPDATE inflight_taskactivations SET status = $1 WHERE id = $2 RETURNING *", ) .bind(status) .bind(id) - .fetch_optional(&mut *conn) + .fetch_optional(&mut *tx) .await?; - let Some(row) = result else { + let Some(mut row) = result else { return Ok(None); }; @@ -723,14 +735,19 @@ impl InflightActivationStore for SqliteActivationStore { let retry_state = activation.retry_state.get_or_insert_default(); retry_state.max_attempts = max_attempts; + let updated_activation = activation.encode_to_vec(); sqlx::query("UPDATE inflight_taskactivations SET activation = $1 WHERE id = $2") - .bind(activation.encode_to_vec()) + .bind(&updated_activation) .bind(id) - .execute(&mut *conn) + .execute(&mut *tx) .await?; + + row.activation = updated_activation; } } + tx.commit().await?; + Ok(Some(row.into())) } From 71ff30eeaea53e723a42cbcedbee3c0ef6a611bd Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 20 May 2026 19:00:01 +0200 Subject: [PATCH 08/10] fix: Update sentry-protos to 0.10.0 and merge main - Sync sentry-protos version across Rust (Cargo.toml) and Python (pyproject.toml) - Update uv.lock to use sentry-protos 0.10.0 (was 0.8.13) - Merge main to fix rebalance integration test Co-Authored-By: Claude Opus 4.5 --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- clients/python/pyproject.toml | 2 +- uv.lock | 6 +++--- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e8d9be9e..41cc4966 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2677,9 +2677,9 @@ dependencies = [ [[package]] name = "sentry_protos" -version = "0.8.32" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2cd6296e52dcf5924b93fc991863571eb9f1fd14267c737c0311a5743914f69" +checksum = "569c47479ab109239963a76f2fac10d20245a5f8c1f38300f9888fc8dae9c884" dependencies = [ "prost", "prost-types", diff --git a/Cargo.toml b/Cargo.toml index afa3f334..c9db167b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,7 @@ sentry = { version = "0.41.0", default-features = false, features = [ "tracing", "logs" ] } -sentry_protos = "0.8.32" +sentry_protos = "0.10.0" serde = "1.0.214" serde_bytes = "0.11" serde_yaml = "0.9.34" diff --git a/clients/python/pyproject.toml b/clients/python/pyproject.toml index 5c626aab..8c577338 100644 --- a/clients/python/pyproject.toml +++ b/clients/python/pyproject.toml @@ -6,7 +6,7 @@ readme = "README.md" dependencies = [ "sentry-arroyo>=2.38.7", "sentry-sdk[http2]>=2.43.0", - "sentry-protos>=0.8.32", + "sentry-protos>=0.10.0", "confluent_kafka>=2.3.0", "cronsim>=2.6", "grpcio>=1.67.0", diff --git a/uv.lock b/uv.lock index 6380aaaa..65447763 100644 --- a/uv.lock +++ b/uv.lock @@ -599,7 +599,7 @@ wheels = [ [[package]] name = "sentry-protos" -version = "0.8.13" +version = "0.10.0" source = { registry = "https://pypi.devinfra.sentry.io/simple" } dependencies = [ { name = "grpc-stubs", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -607,7 +607,7 @@ dependencies = [ { name = "protobuf", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] wheels = [ - { url = "https://pypi.devinfra.sentry.io/wheels/sentry_protos-0.8.13-py3-none-any.whl", hash = "sha256:8cebc86dbb20cea0157f488e0509bb854b8ec03f840ceb09445b2c4c2ee27d4f" }, + { url = "https://pypi.devinfra.sentry.io/wheels/sentry_protos-0.10.0-py3-none-any.whl", hash = "sha256:38b7c19fcd8a5c5ea642ce568142416de4761a5b10b3daed972d7503aa7c4428" }, ] [[package]] @@ -753,7 +753,7 @@ requires-dist = [ { name = "redis", specifier = ">=3.4.1" }, { name = "redis-py-cluster", specifier = ">=2.1.0" }, { name = "sentry-arroyo", specifier = ">=2.38.7" }, - { name = "sentry-protos", specifier = ">=0.8.13" }, + { name = "sentry-protos", specifier = ">=0.8.32" }, { name = "sentry-sdk", extras = ["http2"], specifier = ">=2.43.0" }, { name = "setuptools", marker = "extra == 'examples'", specifier = ">=80.0" }, { name = "zstandard", specifier = ">=0.18.0" }, From 1fd5b47d211fbe332ee79849a5e0536606184cd5 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 20 May 2026 19:08:38 +0200 Subject: [PATCH 09/10] remove max_attempts_val, update lockfile --- clients/python/src/taskbroker_client/worker/workerchild.py | 4 +--- uv.lock | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index 8d728cd8..9db1173f 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -287,7 +287,6 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: set_current_task(inflight.activation) next_state = TASK_ACTIVATION_STATUS_FAILURE - max_attempts_val: int | None = None # Use time.time() so we can measure against activation.received_at execution_start_time = time.time() try: @@ -315,7 +314,6 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: retry = task_func.retry if retry and retry.should_retry(inflight.activation.retry_state, err): next_state = TASK_ACTIVATION_STATUS_RETRY - max_attempts_val = retry._times + 1 else: next_state = TASK_ACTIVATION_STATUS_FAILURE except Exception as err: @@ -380,7 +378,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: status=next_state, host=inflight.host, receive_timestamp=inflight.receive_timestamp, - max_attempts=max_attempts_val, + max_attempts=task_func.retry._times + 1 if task_func.retry else None, ) ) diff --git a/uv.lock b/uv.lock index 65447763..daec9235 100644 --- a/uv.lock +++ b/uv.lock @@ -753,7 +753,7 @@ requires-dist = [ { name = "redis", specifier = ">=3.4.1" }, { name = "redis-py-cluster", specifier = ">=2.1.0" }, { name = "sentry-arroyo", specifier = ">=2.38.7" }, - { name = "sentry-protos", specifier = ">=0.8.32" }, + { name = "sentry-protos", specifier = ">=0.10.0" }, { name = "sentry-sdk", extras = ["http2"], specifier = ">=2.43.0" }, { name = "setuptools", marker = "extra == 'examples'", specifier = ">=80.0" }, { name = "zstandard", specifier = ">=0.18.0" }, From 0bf91053ac162ead26d660270a1b03a3f0c138c3 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 20 May 2026 19:19:23 +0200 Subject: [PATCH 10/10] send max_attempts less often, prevent retry topic from being the same as main topic in raw mode --- .../python/src/taskbroker_client/worker/workerchild.py | 9 ++++++++- src/kafka/deserialize_raw.rs | 7 +++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index 9db1173f..cd265203 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -378,7 +378,14 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: status=next_state, host=inflight.host, receive_timestamp=inflight.receive_timestamp, - max_attempts=task_func.retry._times + 1 if task_func.retry else None, + # Send max_attempts if this is a retry. Don't send it + # on every task as this codepath is relatively + # unoptimized on the broker side. + max_attempts=( + task_func.retry._times + 1 + if task_func.retry and next_state == TASK_ACTIVATION_STATUS_RETRY + else None + ), ) ) diff --git a/src/kafka/deserialize_raw.rs b/src/kafka/deserialize_raw.rs index 803f09a0..6e6bd742 100644 --- a/src/kafka/deserialize_raw.rs +++ b/src/kafka/deserialize_raw.rs @@ -44,6 +44,13 @@ impl RawConfig { application ); + if let Some(ref retry_topic) = config.kafka_retry_topic { + assert!( + retry_topic != &config.kafka_topic, + "kafka_retry_topic cannot equal kafka_topic when raw_mode is enabled" + ); + } + Some(Self { namespace: config .raw_namespace