From fbd24531bde7d200ed915a8ce8298fd899423af8 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Tue, 14 Apr 2026 14:15:13 -0700 Subject: [PATCH 1/7] Add Useful Push Taskbroker Metrics --- .../src/taskbroker_client/worker/worker.py | 23 +++ src/fetch/mod.rs | 22 ++- src/grpc/metrics_middleware.rs | 6 +- src/grpc/server.rs | 42 +++-- src/push/mod.rs | 151 ++++++++++++++++-- src/store/inflight_activation.rs | 4 + src/store/postgres_activation_store.rs | 4 + 7 files changed, 220 insertions(+), 32 deletions(-) diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index 81c116ea..856bd17b 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -60,6 +60,11 @@ def PushTask( context: grpc.ServicerContext, ) -> PushTaskResponse: """Handle incoming task activation.""" + start_time = time.monotonic() + self.worker._metrics.incr( + "taskworker.worker.push_rpc", + tags={"result": "attempt", "processing_pool": self.worker._processing_pool_name}, + ) # Create `InflightTaskActivation` from the pushed task inflight = InflightTaskActivation( activation=request.task, @@ -69,8 +74,26 @@ def PushTask( # Push the task to the worker queue (wait at most 5 seconds) if not self.worker.push_task(inflight, timeout=5): + self.worker._metrics.incr( + "taskworker.worker.push_rpc", + tags={"result": "busy", "processing_pool": self.worker._processing_pool_name}, + ) + self.worker._metrics.distribution( + "taskworker.worker.push_rpc.duration", + time.monotonic() - start_time, + tags={"result": "busy", "processing_pool": self.worker._processing_pool_name}, + ) context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, "worker busy") + self.worker._metrics.incr( + "taskworker.worker.push_rpc", + tags={"result": "accepted", "processing_pool": self.worker._processing_pool_name}, + ) + self.worker._metrics.distribution( + "taskworker.worker.push_rpc.duration", + time.monotonic() - start_time, + tags={"result": "accepted", "processing_pool": self.worker._processing_pool_name}, + ) return PushTaskResponse() diff --git a/src/fetch/mod.rs b/src/fetch/mod.rs index d53a5519..6002f00e 100644 --- a/src/fetch/mod.rs +++ b/src/fetch/mod.rs @@ -121,6 +121,7 @@ impl FetchPool { .await { Ok(activations) if activations.is_empty() => { + metrics::counter!("push.fetch.empty").increment(1); debug!("No pending activations"); // Wait for pending activations to appear @@ -128,12 +129,27 @@ impl FetchPool { } Ok(activations) => { + metrics::counter!("push.fetch.claimed") + .increment(activations.len() as u64); + metrics::histogram!("push.fetch.claim_batch_size") + .record(activations.len() as f64); debug!("Fetched {} activations", activations.len()); for activation in activations { let id = activation.id.clone(); - if let Err(e) = pusher.push_task(activation).await { + match pusher.push_task(activation).await { + Ok(()) => { + metrics::counter!("push.fetch.submit", "result" => "ok") + .increment(1); + } + Err(e) => { + let reason = match &e { + PushError::Timeout => "timeout", + PushError::Channel(_) => "channel_error", + }; + metrics::counter!("push.fetch.submit", "result" => reason) + .increment(1); match e { PushError::Timeout => warn!( task_id = %id, @@ -149,13 +165,13 @@ impl FetchPool { } backoff = true; + } } } - - } Err(e) => { + metrics::counter!("push.fetch.store_error").increment(1); warn!( error = ?e, "Store failed while fetching tasks" diff --git a/src/grpc/metrics_middleware.rs b/src/grpc/metrics_middleware.rs index e8dd8e9c..6584f1ee 100644 --- a/src/grpc/metrics_middleware.rs +++ b/src/grpc/metrics_middleware.rs @@ -43,9 +43,11 @@ where let path = req.uri().path().to_string(); let response = inner.call(req).await?; + let status = response.status().as_u16().to_string(); - metrics::counter!("grpc.request", "path" => path.clone()).increment(1); - metrics::histogram!("grpc.request.duration", "path" => path.clone()) + metrics::counter!("grpc.request", "path" => path.clone(), "status" => status.clone()) + .increment(1); + metrics::histogram!("grpc.request.duration", "path" => path.clone(), "status" => status) .record(start.elapsed()); Ok(response) diff --git a/src/grpc/server.rs b/src/grpc/server.rs index a9e13557..934cd1d2 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -100,17 +100,37 @@ impl ConsumerService for TaskbrokerServer { } let update_result = self.store.set_status(&id, status).await; - if let Err(e) = update_result { - error!( - ?id, - ?status, - "Unable to update status of activation: {:?}", - e, - ); - return Err(Status::internal(format!( - "Unable to update status of {id:?} to {status:?}" - ))); - } + let inflight = match update_result { + Ok(inflight) => inflight, + Err(e) => { + metrics::counter!( + "grpc_server.set_status", + "result" => "error", + "status" => status.to_string() + ) + .increment(1); + error!( + ?id, + ?status, + "Unable to update status of activation: {:?}", + e, + ); + return Err(Status::internal(format!( + "Unable to update status of {id:?} to {status:?}" + ))); + } + }; + let result_label = if inflight.is_some() { + "ok" + } else { + "not_found" + }; + metrics::counter!( + "grpc_server.set_status", + "result" => result_label, + "status" => status.to_string() + ) + .increment(1); metrics::histogram!("grpc_server.set_status.duration").record(start_time.elapsed()); if self.config.delivery_mode == DeliveryMode::Push { diff --git a/src/push/mod.rs b/src/push/mod.rs index c552e7b5..5583b55d 100644 --- a/src/push/mod.rs +++ b/src/push/mod.rs @@ -10,6 +10,7 @@ use sentry_protos::taskbroker::v1::worker_service_client::WorkerServiceClient; use sentry_protos::taskbroker::v1::{PushTaskRequest, TaskActivation}; use sha2::Sha256; use tokio::task::JoinSet; +use tonic::Code; use tonic::async_trait; use tonic::metadata::MetadataValue; use tonic::transport::Channel; @@ -127,9 +128,15 @@ impl PushPool { let grpc_shared_secret = self.config.grpc_shared_secret.clone(); async move { + metrics::counter!("push.worker.connect.attempt").increment(1); let mut worker = match WorkerServiceClient::connect(endpoint).await { - Ok(w) => w, + Ok(w) => { + metrics::counter!("push.worker.connect", "result" => "ok").increment(1); + w + } Err(e) => { + metrics::counter!("push.worker.connect", "result" => "error") + .increment(1); error!("Failed to connect to worker - {:?}", e); // When we fail to connect, the taskbroker will shut down, but this may change in the future @@ -147,7 +154,10 @@ impl PushPool { message = receiver.recv_async() => { let activation = match message { // Received activation from fetch thread - Ok(a) => a, + Ok(a) => { + metrics::gauge!("push.queue.depth").set(receiver.len() as f64); + a + } // Channel closed Err(_) => break @@ -166,9 +176,11 @@ impl PushPool { .await { Ok(_) => { + metrics::counter!("push.delivery", "result" => "ok").increment(1); debug!(task_id = %id, "Activation sent to worker"); if let Err(e) = store.mark_activation_processing(&id).await { + metrics::counter!("push.mark_activation_processing", "result" => "error").increment(1); error!( task_id = %id, error = ?e, @@ -178,11 +190,21 @@ impl PushPool { } // Once processing deadline expires, status will be set back to pending - Err(e) => error!( - task_id = %id, - error = ?e, - "Failed to send activation to worker" - ) + Err(e) => { + let (kind, status) = classify_push_error(&e); + metrics::counter!( + "push.delivery", + "result" => "error", + "error_kind" => kind, + "grpc_status" => status + ) + .increment(1); + error!( + task_id = %id, + error = ?e, + "Failed to send activation to worker" + ) + } }; } } @@ -203,9 +225,11 @@ impl PushPool { .await { Ok(_) => { + metrics::counter!("push.delivery", "result" => "ok").increment(1); debug!(task_id = %id, "Activation sent to worker"); if let Err(e) = store.mark_activation_processing(&id).await { + metrics::counter!("push.mark_activation_processing", "result" => "error").increment(1); error!( task_id = %id, error = ?e, @@ -215,11 +239,21 @@ impl PushPool { } // Once processing deadline expires, status will be set back to pending - Err(e) => error!( - task_id = %id, - error = ?e, - "Failed to send activation to worker" - ), + Err(e) => { + let (kind, status) = classify_push_error(&e); + metrics::counter!( + "push.delivery", + "result" => "error", + "error_kind" => kind, + "grpc_status" => status + ) + .increment(1); + error!( + task_id = %id, + error = ?e, + "Failed to send activation to worker" + ) + } }; } @@ -246,15 +280,29 @@ impl PushPool { /// Send an activation to the internal asynchronous MPMC channel used by all running push threads. Times out after `config.push_queue_timeout_ms` milliseconds. pub async fn submit(&self, activation: InflightActivation) -> Result<(), PushError> { let duration = Duration::from_millis(self.config.push_queue_timeout_ms); + let start = Instant::now(); + metrics::gauge!("push.queue.depth").set(self.sender.len() as f64); match tokio::time::timeout(duration, self.sender.send_async(activation)).await { - Ok(Ok(())) => Ok(()), + Ok(Ok(())) => { + metrics::histogram!("push.queue.wait_duration").record(start.elapsed()); + metrics::gauge!("push.queue.depth").set(self.sender.len() as f64); + Ok(()) + } // The channel has a problem - Ok(Err(e)) => Err(PushError::Channel(e)), + Ok(Err(e)) => { + metrics::histogram!("push.queue.wait_duration").record(start.elapsed()); + metrics::counter!("push.queue.submit", "result" => "channel_error").increment(1); + Err(PushError::Channel(e)) + } // The channel was full so the send timed out - Err(_elapsed) => Err(PushError::Timeout), + Err(_elapsed) => { + metrics::histogram!("push.queue.wait_duration").record(start.elapsed()); + metrics::counter!("push.queue.submit", "result" => "timeout").increment(1); + Err(PushError::Timeout) + } } } } @@ -268,9 +316,22 @@ async fn push_task( grpc_shared_secret: &[String], ) -> Result<()> { let start = Instant::now(); + metrics::counter!("push.rpc.attempt").increment(1); // Try to decode activation (if it fails, we will see the error where `push_task` is called) - let task = TaskActivation::decode(&activation.activation as &[u8])?; + let task = match TaskActivation::decode(&activation.activation as &[u8]) { + Ok(task) => task, + Err(err) => { + metrics::counter!( + "push.rpc.failure", + "error_kind" => "decode", + "grpc_status" => "none" + ) + .increment(1); + metrics::histogram!("push.rpc.duration", "result" => "error").record(start.elapsed()); + return Err(err.into()); + } + }; let request = PushTaskRequest { task: Some(task), @@ -283,9 +344,67 @@ async fn push_task( Err(e) => Err(e.into()), }; + match &result { + Ok(()) => metrics::counter!("push.rpc.success").increment(1), + Err(err) => { + let (kind, status) = classify_push_error(err); + metrics::counter!( + "push.rpc.failure", + "error_kind" => kind, + "grpc_status" => status + ) + .increment(1); + } + } + metrics::histogram!( + "push.rpc.duration", + "result" => if result.is_ok() { "ok" } else { "error" } + ) + .record(start.elapsed()); metrics::histogram!("push.push_task.duration").record(start.elapsed()); result } +fn classify_push_error(error: &anyhow::Error) -> (&'static str, &'static str) { + if error + .downcast_ref::() + .is_some() + { + return ("timeout", "deadline_exceeded"); + } + + if let Some(status) = error.downcast_ref::() { + return ("grpc", tonic_code_name(status.code())); + } + + if error.downcast_ref::().is_some() { + return ("decode", "none"); + } + + ("other", "none") +} + +fn tonic_code_name(code: Code) -> &'static str { + match code { + Code::Ok => "ok", + Code::Cancelled => "cancelled", + Code::Unknown => "unknown", + Code::InvalidArgument => "invalid_argument", + Code::DeadlineExceeded => "deadline_exceeded", + Code::NotFound => "not_found", + Code::AlreadyExists => "already_exists", + Code::PermissionDenied => "permission_denied", + Code::ResourceExhausted => "resource_exhausted", + Code::FailedPrecondition => "failed_precondition", + Code::Aborted => "aborted", + Code::OutOfRange => "out_of_range", + Code::Unimplemented => "unimplemented", + Code::Internal => "internal", + Code::Unavailable => "unavailable", + Code::DataLoss => "data_loss", + Code::Unauthenticated => "unauthenticated", + } +} + #[cfg(test)] mod tests; diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index e65f4541..f95015f8 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -1011,10 +1011,14 @@ impl InflightActivationStore for SqliteActivationStore { .await?; if result.rows_affected() == 0 { + metrics::counter!("push.mark_activation_processing", "result" => "not_found") + .increment(1); warn!( task_id = %id, "Activation could not be marked as sent, it may be missing or its status may have already changed" ); + } else { + metrics::counter!("push.mark_activation_processing", "result" => "ok").increment(1); } Ok(()) diff --git a/src/store/postgres_activation_store.rs b/src/store/postgres_activation_store.rs index 051855db..bb36a564 100644 --- a/src/store/postgres_activation_store.rs +++ b/src/store/postgres_activation_store.rs @@ -421,10 +421,14 @@ impl InflightActivationStore for PostgresActivationStore { .await?; if result.rows_affected() == 0 { + metrics::counter!("push.mark_activation_processing", "result" => "not_found") + .increment(1); warn!( task_id = %id, "Activation could not be marked as processing, it may be missing or its status may have already changed" ); + } else { + metrics::counter!("push.mark_activation_processing", "result" => "ok").increment(1); } Ok(()) From 2ae4a59b9d1555f2ad55801ee2d2b3f7773376dc Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Wed, 15 Apr 2026 11:49:07 -0700 Subject: [PATCH 2/7] Metrics Tweaks --- src/fetch/mod.rs | 44 ++++++++-------- src/fetch/tests.rs | 2 +- src/push/mod.rs | 124 ++++++++++++--------------------------------- 3 files changed, 54 insertions(+), 116 deletions(-) diff --git a/src/fetch/mod.rs b/src/fetch/mod.rs index 6002f00e..7be1bbab 100644 --- a/src/fetch/mod.rs +++ b/src/fetch/mod.rs @@ -46,13 +46,13 @@ pub fn bucket_range_for_fetch_thread(thread_index: usize, fetch_threads: usize) /// Thin interface for the push pool. It mostly serves to enable proper unit testing, but it also decouples fetch logic from push logic even further. #[async_trait] pub trait TaskPusher { - /// Push a single task to the worker service. - async fn push_task(&self, activation: InflightActivation) -> Result<(), PushError>; + /// Submit a single task to the push pool. + async fn submit_task(&self, activation: InflightActivation) -> Result<(), PushError>; } #[async_trait] impl TaskPusher for PushPool { - async fn push_task(&self, activation: InflightActivation) -> Result<(), PushError> { + async fn submit_task(&self, activation: InflightActivation) -> Result<(), PushError> { self.submit(activation).await } } @@ -133,38 +133,38 @@ impl FetchPool { .increment(activations.len() as u64); metrics::histogram!("push.fetch.claim_batch_size") .record(activations.len() as f64); + debug!("Fetched {} activations", activations.len()); for activation in activations { let id = activation.id.clone(); - match pusher.push_task(activation).await { - Ok(()) => { - metrics::counter!("push.fetch.submit", "result" => "ok") - .increment(1); - } - Err(e) => { - let reason = match &e { - PushError::Timeout => "timeout", - PushError::Channel(_) => "channel_error", - }; - metrics::counter!("push.fetch.submit", "result" => reason) + match pusher.submit_task(activation).await { + Ok(()) => metrics::counter!("push.fetch.submit", "result" => "ok").increment(1), + + Err(PushError::Timeout) => { + metrics::counter!("push.fetch.submit", "result" => "timeout") .increment(1); - match e { - PushError::Timeout => warn!( + + warn!( task_id = %id, "Submit to push pool timed out after {} milliseconds", config.push_queue_timeout_ms - ), + ); - PushError::Channel(e) => warn!( + // Wait for push queue to empty + backoff = true; + } + + Err(PushError::Channel(e)) => { + metrics::counter!("push.fetch.submit", "result" => "channel_error") + .increment(1); + + warn!( task_id = %id, error = ?e, "Submit to push pool failed due to channel error", - ) - } - - backoff = true; + ); } } } diff --git a/src/fetch/tests.rs b/src/fetch/tests.rs index fc652bc1..e85c051e 100644 --- a/src/fetch/tests.rs +++ b/src/fetch/tests.rs @@ -198,7 +198,7 @@ impl RecordingPusher { #[async_trait] impl TaskPusher for RecordingPusher { - async fn push_task(&self, activation: InflightActivation) -> Result<(), PushError> { + async fn submit_task(&self, activation: InflightActivation) -> Result<(), PushError> { self.pushed_ids.lock().await.push(activation.id.clone()); if self.fail { diff --git a/src/push/mod.rs b/src/push/mod.rs index 5583b55d..36a68954 100644 --- a/src/push/mod.rs +++ b/src/push/mod.rs @@ -10,7 +10,6 @@ use sentry_protos::taskbroker::v1::worker_service_client::WorkerServiceClient; use sentry_protos::taskbroker::v1::{PushTaskRequest, TaskActivation}; use sha2::Sha256; use tokio::task::JoinSet; -use tonic::Code; use tonic::async_trait; use tonic::metadata::MetadataValue; use tonic::transport::Channel; @@ -129,11 +128,13 @@ impl PushPool { async move { metrics::counter!("push.worker.connect.attempt").increment(1); + let mut worker = match WorkerServiceClient::connect(endpoint).await { Ok(w) => { metrics::counter!("push.worker.connect", "result" => "ok").increment(1); w } + Err(e) => { metrics::counter!("push.worker.connect", "result" => "error") .increment(1); @@ -154,10 +155,7 @@ impl PushPool { message = receiver.recv_async() => { let activation = match message { // Received activation from fetch thread - Ok(a) => { - metrics::gauge!("push.queue.depth").set(receiver.len() as f64); - a - } + Ok(a) => a, // Channel closed Err(_) => break @@ -181,6 +179,7 @@ impl PushPool { if let Err(e) = store.mark_activation_processing(&id).await { metrics::counter!("push.mark_activation_processing", "result" => "error").increment(1); + error!( task_id = %id, error = ?e, @@ -189,16 +188,10 @@ impl PushPool { } } - // Once processing deadline expires, status will be set back to pending + // Once claim expires, status will be set back to pending Err(e) => { - let (kind, status) = classify_push_error(&e); - metrics::counter!( - "push.delivery", - "result" => "error", - "error_kind" => kind, - "grpc_status" => status - ) - .increment(1); + metrics::counter!("push.delivery", "result" => "error").increment(1); + error!( task_id = %id, error = ?e, @@ -230,6 +223,7 @@ impl PushPool { if let Err(e) = store.mark_activation_processing(&id).await { metrics::counter!("push.mark_activation_processing", "result" => "error").increment(1); + error!( task_id = %id, error = ?e, @@ -240,14 +234,9 @@ impl PushPool { // Once processing deadline expires, status will be set back to pending Err(e) => { - let (kind, status) = classify_push_error(&e); - metrics::counter!( - "push.delivery", - "result" => "error", - "error_kind" => kind, - "grpc_status" => status - ) - .increment(1); + metrics::counter!("push.delivery", "result" => "error") + .increment(1); + error!( task_id = %id, error = ?e, @@ -281,26 +270,24 @@ impl PushPool { pub async fn submit(&self, activation: InflightActivation) -> Result<(), PushError> { let duration = Duration::from_millis(self.config.push_queue_timeout_ms); let start = Instant::now(); + metrics::gauge!("push.queue.depth").set(self.sender.len() as f64); match tokio::time::timeout(duration, self.sender.send_async(activation)).await { Ok(Ok(())) => { metrics::histogram!("push.queue.wait_duration").record(start.elapsed()); - metrics::gauge!("push.queue.depth").set(self.sender.len() as f64); Ok(()) } // The channel has a problem Ok(Err(e)) => { metrics::histogram!("push.queue.wait_duration").record(start.elapsed()); - metrics::counter!("push.queue.submit", "result" => "channel_error").increment(1); Err(PushError::Channel(e)) } // The channel was full so the send timed out Err(_elapsed) => { metrics::histogram!("push.queue.wait_duration").record(start.elapsed()); - metrics::counter!("push.queue.submit", "result" => "timeout").increment(1); Err(PushError::Timeout) } } @@ -316,19 +303,16 @@ async fn push_task( grpc_shared_secret: &[String], ) -> Result<()> { let start = Instant::now(); - metrics::counter!("push.rpc.attempt").increment(1); + metrics::counter!("push.push_task.attempt").increment(1); // Try to decode activation (if it fails, we will see the error where `push_task` is called) let task = match TaskActivation::decode(&activation.activation as &[u8]) { Ok(task) => task, Err(err) => { - metrics::counter!( - "push.rpc.failure", - "error_kind" => "decode", - "grpc_status" => "none" - ) - .increment(1); - metrics::histogram!("push.rpc.duration", "result" => "error").record(start.elapsed()); + metrics::counter!("push.push_task.failure", "reason" => "decode").increment(1); + metrics::histogram!("push.push_task.duration", "result" => "error") + .record(start.elapsed()); + return Err(err.into()); } }; @@ -340,70 +324,24 @@ async fn push_task( let result = match tokio::time::timeout(timeout, worker.send(request, grpc_shared_secret)).await { - Ok(r) => r, - Err(e) => Err(e.into()), - }; + Ok(result) => { + if result.is_ok() { + metrics::counter!("push.push_task.success").increment(1); + } else { + metrics::counter!("push.push_task.failure", "reason" => "rpc").increment(1); + } - match &result { - Ok(()) => metrics::counter!("push.rpc.success").increment(1), - Err(err) => { - let (kind, status) = classify_push_error(err); - metrics::counter!( - "push.rpc.failure", - "error_kind" => kind, - "grpc_status" => status - ) - .increment(1); + result } - } - metrics::histogram!( - "push.rpc.duration", - "result" => if result.is_ok() { "ok" } else { "error" } - ) - .record(start.elapsed()); - metrics::histogram!("push.push_task.duration").record(start.elapsed()); - result -} - -fn classify_push_error(error: &anyhow::Error) -> (&'static str, &'static str) { - if error - .downcast_ref::() - .is_some() - { - return ("timeout", "deadline_exceeded"); - } - - if let Some(status) = error.downcast_ref::() { - return ("grpc", tonic_code_name(status.code())); - } - if error.downcast_ref::().is_some() { - return ("decode", "none"); - } - - ("other", "none") -} + Err(e) => { + metrics::counter!("push.push_task.failure", "reason" => "timeout").increment(1); + Err(e.into()) + } + }; -fn tonic_code_name(code: Code) -> &'static str { - match code { - Code::Ok => "ok", - Code::Cancelled => "cancelled", - Code::Unknown => "unknown", - Code::InvalidArgument => "invalid_argument", - Code::DeadlineExceeded => "deadline_exceeded", - Code::NotFound => "not_found", - Code::AlreadyExists => "already_exists", - Code::PermissionDenied => "permission_denied", - Code::ResourceExhausted => "resource_exhausted", - Code::FailedPrecondition => "failed_precondition", - Code::Aborted => "aborted", - Code::OutOfRange => "out_of_range", - Code::Unimplemented => "unimplemented", - Code::Internal => "internal", - Code::Unavailable => "unavailable", - Code::DataLoss => "data_loss", - Code::Unauthenticated => "unauthenticated", - } + metrics::histogram!("push.push_task.duration").record(start.elapsed()); + result } #[cfg(test)] From 115b8f0a2e11ca7e3721487eb698c96ddf648cf9 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Wed, 15 Apr 2026 11:58:10 -0700 Subject: [PATCH 3/7] More Metrics Tweaks --- src/grpc/server.rs | 32 +++++++++++++++----------- src/store/inflight_activation.rs | 1 + src/store/postgres_activation_store.rs | 1 + 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 934cd1d2..e4efee44 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -99,9 +99,21 @@ impl ConsumerService for TaskbrokerServer { metrics::counter!("grpc_server.set_status.failure").increment(1); } - let update_result = self.store.set_status(&id, status).await; - let inflight = match update_result { - Ok(inflight) => inflight, + match self.store.set_status(&id, status).await { + Ok(Some(_)) => metrics::counter!( + "grpc_server.set_status", + "result" => "ok", + "status" => status.to_string() + ) + .increment(1), + + Ok(None) => metrics::counter!( + "grpc_server.set_status", + "result" => "not_found", + "status" => status.to_string() + ) + .increment(1), + Err(e) => { metrics::counter!( "grpc_server.set_status", @@ -109,28 +121,20 @@ impl ConsumerService for TaskbrokerServer { "status" => status.to_string() ) .increment(1); + error!( ?id, ?status, "Unable to update status of activation: {:?}", e, ); + return Err(Status::internal(format!( "Unable to update status of {id:?} to {status:?}" ))); } }; - let result_label = if inflight.is_some() { - "ok" - } else { - "not_found" - }; - metrics::counter!( - "grpc_server.set_status", - "result" => result_label, - "status" => status.to_string() - ) - .increment(1); + metrics::histogram!("grpc_server.set_status.duration").record(start_time.elapsed()); if self.config.delivery_mode == DeliveryMode::Push { diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index f95015f8..1394ebe3 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -1013,6 +1013,7 @@ impl InflightActivationStore for SqliteActivationStore { if result.rows_affected() == 0 { metrics::counter!("push.mark_activation_processing", "result" => "not_found") .increment(1); + warn!( task_id = %id, "Activation could not be marked as sent, it may be missing or its status may have already changed" diff --git a/src/store/postgres_activation_store.rs b/src/store/postgres_activation_store.rs index bb36a564..26f6e499 100644 --- a/src/store/postgres_activation_store.rs +++ b/src/store/postgres_activation_store.rs @@ -423,6 +423,7 @@ impl InflightActivationStore for PostgresActivationStore { if result.rows_affected() == 0 { metrics::counter!("push.mark_activation_processing", "result" => "not_found") .increment(1); + warn!( task_id = %id, "Activation could not be marked as processing, it may be missing or its status may have already changed" From a16f3bc35da1ea308ea779c6ecd877e7e4b8ff48 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Wed, 15 Apr 2026 12:05:32 -0700 Subject: [PATCH 4/7] Final (?) Metrics Tweaks --- .../src/taskbroker_client/worker/worker.py | 5 ++++ src/push/mod.rs | 25 ++++--------------- 2 files changed, 10 insertions(+), 20 deletions(-) diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index 856bd17b..27aa5fca 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -65,6 +65,7 @@ def PushTask( "taskworker.worker.push_rpc", tags={"result": "attempt", "processing_pool": self.worker._processing_pool_name}, ) + # Create `InflightTaskActivation` from the pushed task inflight = InflightTaskActivation( activation=request.task, @@ -78,22 +79,26 @@ def PushTask( "taskworker.worker.push_rpc", tags={"result": "busy", "processing_pool": self.worker._processing_pool_name}, ) + self.worker._metrics.distribution( "taskworker.worker.push_rpc.duration", time.monotonic() - start_time, tags={"result": "busy", "processing_pool": self.worker._processing_pool_name}, ) + context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, "worker busy") self.worker._metrics.incr( "taskworker.worker.push_rpc", tags={"result": "accepted", "processing_pool": self.worker._processing_pool_name}, ) + self.worker._metrics.distribution( "taskworker.worker.push_rpc.duration", time.monotonic() - start_time, tags={"result": "accepted", "processing_pool": self.worker._processing_pool_name}, ) + return PushTaskResponse() diff --git a/src/push/mod.rs b/src/push/mod.rs index 36a68954..bb136cc3 100644 --- a/src/push/mod.rs +++ b/src/push/mod.rs @@ -48,8 +48,7 @@ pub enum PushError { #[async_trait] trait WorkerClient { /// Send a single `PushTaskRequest` to the worker service. - /// - /// When `grpc_shared_secret` is non-empty, signs with `grpc_shared_secret[0]` and sets `sentry-signature` metadata (same scheme as Python pull client and broker `AuthLayer`). + /// When `grpc_shared_secret` is not empty, it signs the request with `grpc_shared_secret[0]` and sets `sentry-signature` metadata (same scheme as Python pull client and broker `AuthLayer`). async fn send(&mut self, request: PushTaskRequest, grpc_shared_secret: &[String]) -> Result<()>; } @@ -74,6 +73,7 @@ impl WorkerClient for WorkerServiceClient { self.push_task(req) .await .map_err(|status| anyhow::anyhow!(status))?; + Ok(()) } } @@ -309,10 +309,7 @@ async fn push_task( let task = match TaskActivation::decode(&activation.activation as &[u8]) { Ok(task) => task, Err(err) => { - metrics::counter!("push.push_task.failure", "reason" => "decode").increment(1); - metrics::histogram!("push.push_task.duration", "result" => "error") - .record(start.elapsed()); - + metrics::histogram!("push.push_task.duration").record(start.elapsed()); return Err(err.into()); } }; @@ -324,20 +321,8 @@ async fn push_task( let result = match tokio::time::timeout(timeout, worker.send(request, grpc_shared_secret)).await { - Ok(result) => { - if result.is_ok() { - metrics::counter!("push.push_task.success").increment(1); - } else { - metrics::counter!("push.push_task.failure", "reason" => "rpc").increment(1); - } - - result - } - - Err(e) => { - metrics::counter!("push.push_task.failure", "reason" => "timeout").increment(1); - Err(e.into()) - } + Ok(r) => r, + Err(e) => Err(e.into()), }; metrics::histogram!("push.push_task.duration").record(start.elapsed()); From 2320bd43c92ce7043096e9ce66235dd45a530b8a Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Wed, 15 Apr 2026 14:45:04 -0700 Subject: [PATCH 5/7] Address AI Comments --- src/fetch/mod.rs | 3 +++ src/grpc/metrics_middleware.rs | 6 ++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/fetch/mod.rs b/src/fetch/mod.rs index e4e35536..9527c787 100644 --- a/src/fetch/mod.rs +++ b/src/fetch/mod.rs @@ -167,6 +167,9 @@ impl FetchPool { error = ?e, "Submit to push pool failed due to channel error", ); + + // Wait before trying again + backoff = true; } } } diff --git a/src/grpc/metrics_middleware.rs b/src/grpc/metrics_middleware.rs index 6584f1ee..e8dd8e9c 100644 --- a/src/grpc/metrics_middleware.rs +++ b/src/grpc/metrics_middleware.rs @@ -43,11 +43,9 @@ where let path = req.uri().path().to_string(); let response = inner.call(req).await?; - let status = response.status().as_u16().to_string(); - metrics::counter!("grpc.request", "path" => path.clone(), "status" => status.clone()) - .increment(1); - metrics::histogram!("grpc.request.duration", "path" => path.clone(), "status" => status) + metrics::counter!("grpc.request", "path" => path.clone()).increment(1); + metrics::histogram!("grpc.request.duration", "path" => path.clone()) .record(start.elapsed()); Ok(response) From 768fa110842374fc65e9a9a093074424ab528698 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Thu, 16 Apr 2026 09:01:42 -0700 Subject: [PATCH 6/7] Change Fetch Metrics Prefix, Emit `set_status` Duration on All Paths --- src/fetch/mod.rs | 14 +++++++------- src/grpc/server.rs | 3 ++- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/fetch/mod.rs b/src/fetch/mod.rs index 9527c787..38305020 100644 --- a/src/fetch/mod.rs +++ b/src/fetch/mod.rs @@ -123,7 +123,7 @@ impl FetchPool { .await { Ok(activations) if activations.is_empty() => { - metrics::counter!("push.fetch.empty").increment(1); + metrics::counter!("fetch.empty").increment(1); debug!("No pending activations"); // Wait for pending activations to appear @@ -131,9 +131,9 @@ impl FetchPool { } Ok(activations) => { - metrics::counter!("push.fetch.claimed") + metrics::counter!("fetch.claimed") .increment(activations.len() as u64); - metrics::histogram!("push.fetch.claim_batch_size") + metrics::histogram!("fetch.claim_batch_size") .record(activations.len() as f64); debug!("Fetched {} activations", activations.len()); @@ -142,10 +142,10 @@ impl FetchPool { let id = activation.id.clone(); match pusher.submit_task(activation).await { - Ok(()) => metrics::counter!("push.fetch.submit", "result" => "ok").increment(1), + Ok(()) => metrics::counter!("fetch.submit", "result" => "ok").increment(1), Err(PushError::Timeout) => { - metrics::counter!("push.fetch.submit", "result" => "timeout") + metrics::counter!("fetch.submit", "result" => "timeout") .increment(1); warn!( @@ -159,7 +159,7 @@ impl FetchPool { } Err(PushError::Channel(e)) => { - metrics::counter!("push.fetch.submit", "result" => "channel_error") + metrics::counter!("fetch.submit", "result" => "channel_error") .increment(1); warn!( @@ -176,7 +176,7 @@ impl FetchPool { } Err(e) => { - metrics::counter!("push.fetch.store_error").increment(1); + metrics::counter!("fetch.store_error").increment(1); warn!( error = ?e, "Store failed while fetching tasks" diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 94600e6d..d4f5161e 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -130,11 +130,12 @@ impl ConsumerService for TaskbrokerServer { e, ); + metrics::histogram!("grpc_server.set_status.duration").record(start_time.elapsed()); return Err(Status::internal(format!( "Unable to update status of {id:?} to {status:?}" ))); } - }; + } metrics::histogram!("grpc_server.set_status.duration").record(start_time.elapsed()); From 15cdddede925380522705762be9df991cf4c71f2 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Thu, 16 Apr 2026 09:20:11 -0700 Subject: [PATCH 7/7] Lint w/New Clippy Version --- src/kafka/consumer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/kafka/consumer.rs b/src/kafka/consumer.rs index bd70ab5a..8de67229 100644 --- a/src/kafka/consumer.rs +++ b/src/kafka/consumer.rs @@ -1000,7 +1000,7 @@ mod tests { self.pipe .write() .unwrap() - .extend(take(&mut self.buffer.write().unwrap() as &mut Vec).into_iter()); + .extend(take(&mut self.buffer.write().unwrap() as &mut Vec)); Ok(Some(())) }