diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index 81c116ea..27aa5fca 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -60,6 +60,12 @@ def PushTask( context: grpc.ServicerContext, ) -> PushTaskResponse: """Handle incoming task activation.""" + start_time = time.monotonic() + self.worker._metrics.incr( + "taskworker.worker.push_rpc", + tags={"result": "attempt", "processing_pool": self.worker._processing_pool_name}, + ) + # Create `InflightTaskActivation` from the pushed task inflight = InflightTaskActivation( activation=request.task, @@ -69,8 +75,30 @@ def PushTask( # Push the task to the worker queue (wait at most 5 seconds) if not self.worker.push_task(inflight, timeout=5): + self.worker._metrics.incr( + "taskworker.worker.push_rpc", + tags={"result": "busy", "processing_pool": self.worker._processing_pool_name}, + ) + + self.worker._metrics.distribution( + "taskworker.worker.push_rpc.duration", + time.monotonic() - start_time, + tags={"result": "busy", "processing_pool": self.worker._processing_pool_name}, + ) + context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, "worker busy") + self.worker._metrics.incr( + "taskworker.worker.push_rpc", + tags={"result": "accepted", "processing_pool": self.worker._processing_pool_name}, + ) + + self.worker._metrics.distribution( + "taskworker.worker.push_rpc.duration", + time.monotonic() - start_time, + tags={"result": "accepted", "processing_pool": self.worker._processing_pool_name}, + ) + return PushTaskResponse() diff --git a/src/fetch/mod.rs b/src/fetch/mod.rs index 9919ee4d..38305020 100644 --- a/src/fetch/mod.rs +++ b/src/fetch/mod.rs @@ -48,13 +48,13 @@ pub fn bucket_range_for_fetch_thread(thread_index: usize, fetch_threads: usize) /// Thin interface for the push pool. It mostly serves to enable proper unit testing, but it also decouples fetch logic from push logic even further. #[async_trait] pub trait TaskPusher { - /// Push a single task to the worker service. - async fn push_task(&self, activation: InflightActivation) -> Result<(), PushError>; + /// Submit a single task to the push pool. + async fn submit_task(&self, activation: InflightActivation) -> Result<(), PushError>; } #[async_trait] impl TaskPusher for PushPool { - async fn push_task(&self, activation: InflightActivation) -> Result<(), PushError> { + async fn submit_task(&self, activation: InflightActivation) -> Result<(), PushError> { self.submit(activation).await } } @@ -123,6 +123,7 @@ impl FetchPool { .await { Ok(activations) if activations.is_empty() => { + metrics::counter!("fetch.empty").increment(1); debug!("No pending activations"); // Wait for pending activations to appear @@ -130,34 +131,52 @@ impl FetchPool { } Ok(activations) => { + metrics::counter!("fetch.claimed") + .increment(activations.len() as u64); + metrics::histogram!("fetch.claim_batch_size") + .record(activations.len() as f64); + debug!("Fetched {} activations", activations.len()); for activation in activations { let id = activation.id.clone(); - if let Err(e) = pusher.push_task(activation).await { - match e { - PushError::Timeout => warn!( + match pusher.submit_task(activation).await { + Ok(()) => metrics::counter!("fetch.submit", "result" => "ok").increment(1), + + Err(PushError::Timeout) => { + metrics::counter!("fetch.submit", "result" => "timeout") + .increment(1); + + warn!( task_id = %id, "Submit to push pool timed out after {} milliseconds", config.push_queue_timeout_ms - ), + ); + + // Wait for push queue to empty + backoff = true; + } - PushError::Channel(e) => warn!( + Err(PushError::Channel(e)) => { + metrics::counter!("fetch.submit", "result" => "channel_error") + .increment(1); + + warn!( task_id = %id, error = ?e, "Submit to push pool failed due to channel error", - ) - } + ); - backoff = true; + // Wait before trying again + backoff = true; + } } } - - } Err(e) => { + metrics::counter!("fetch.store_error").increment(1); warn!( error = ?e, "Store failed while fetching tasks" diff --git a/src/fetch/tests.rs b/src/fetch/tests.rs index d9a64147..ded1e681 100644 --- a/src/fetch/tests.rs +++ b/src/fetch/tests.rs @@ -196,7 +196,7 @@ impl RecordingPusher { #[async_trait] impl TaskPusher for RecordingPusher { - async fn push_task(&self, activation: InflightActivation) -> Result<(), PushError> { + async fn submit_task(&self, activation: InflightActivation) -> Result<(), PushError> { self.pushed_ids.lock().await.push(activation.id.clone()); if self.fail { diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 0b963319..d4f5161e 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -100,18 +100,43 @@ impl ConsumerService for TaskbrokerServer { metrics::counter!("grpc_server.set_status.failure").increment(1); } - let update_result = self.store.set_status(&id, status).await; - if let Err(e) = update_result { - error!( - ?id, - ?status, - "Unable to update status of activation: {:?}", - e, - ); - return Err(Status::internal(format!( - "Unable to update status of {id:?} to {status:?}" - ))); + match self.store.set_status(&id, status).await { + Ok(Some(_)) => metrics::counter!( + "grpc_server.set_status", + "result" => "ok", + "status" => status.to_string() + ) + .increment(1), + + Ok(None) => metrics::counter!( + "grpc_server.set_status", + "result" => "not_found", + "status" => status.to_string() + ) + .increment(1), + + Err(e) => { + metrics::counter!( + "grpc_server.set_status", + "result" => "error", + "status" => status.to_string() + ) + .increment(1); + + error!( + ?id, + ?status, + "Unable to update status of activation: {:?}", + e, + ); + + metrics::histogram!("grpc_server.set_status.duration").record(start_time.elapsed()); + return Err(Status::internal(format!( + "Unable to update status of {id:?} to {status:?}" + ))); + } } + metrics::histogram!("grpc_server.set_status.duration").record(start_time.elapsed()); if self.config.delivery_mode == DeliveryMode::Push { diff --git a/src/kafka/consumer.rs b/src/kafka/consumer.rs index bd70ab5a..8de67229 100644 --- a/src/kafka/consumer.rs +++ b/src/kafka/consumer.rs @@ -1000,7 +1000,7 @@ mod tests { self.pipe .write() .unwrap() - .extend(take(&mut self.buffer.write().unwrap() as &mut Vec).into_iter()); + .extend(take(&mut self.buffer.write().unwrap() as &mut Vec)); Ok(Some(())) } diff --git a/src/push/mod.rs b/src/push/mod.rs index 9c91b5be..f45efbd7 100644 --- a/src/push/mod.rs +++ b/src/push/mod.rs @@ -49,8 +49,7 @@ pub enum PushError { #[async_trait] trait WorkerClient { /// Send a single `PushTaskRequest` to the worker service. - /// - /// When `grpc_shared_secret` is non-empty, signs with `grpc_shared_secret[0]` and sets `sentry-signature` metadata (same scheme as Python pull client and broker `AuthLayer`). + /// When `grpc_shared_secret` is not empty, it signs the request with `grpc_shared_secret[0]` and sets `sentry-signature` metadata (same scheme as Python pull client and broker `AuthLayer`). async fn send(&mut self, request: PushTaskRequest, grpc_shared_secret: &[String]) -> Result<()>; } @@ -75,6 +74,7 @@ impl WorkerClient for WorkerServiceClient { self.push_task(req) .await .map_err(|status| anyhow::anyhow!(status))?; + Ok(()) } } @@ -128,9 +128,17 @@ impl PushPool { let grpc_shared_secret = self.config.grpc_shared_secret.clone(); async move { + metrics::counter!("push.worker.connect.attempt").increment(1); + let mut worker = match WorkerServiceClient::connect(endpoint).await { - Ok(w) => w, + Ok(w) => { + metrics::counter!("push.worker.connect", "result" => "ok").increment(1); + w + } + Err(e) => { + metrics::counter!("push.worker.connect", "result" => "error") + .increment(1); error!("Failed to connect to worker - {:?}", e); // When we fail to connect, the taskbroker will shut down, but this may change in the future @@ -167,9 +175,12 @@ impl PushPool { .await { Ok(_) => { + metrics::counter!("push.delivery", "result" => "ok").increment(1); debug!(task_id = %id, "Activation sent to worker"); if let Err(e) = store.mark_activation_processing(&id).await { + metrics::counter!("push.mark_activation_processing", "result" => "error").increment(1); + error!( task_id = %id, error = ?e, @@ -178,12 +189,16 @@ impl PushPool { } } - // Once processing deadline expires, status will be set back to pending - Err(e) => error!( - task_id = %id, - error = ?e, - "Failed to send activation to worker" - ) + // Once claim expires, status will be set back to pending + Err(e) => { + metrics::counter!("push.delivery", "result" => "error").increment(1); + + error!( + task_id = %id, + error = ?e, + "Failed to send activation to worker" + ) + } }; } } @@ -204,9 +219,12 @@ impl PushPool { .await { Ok(_) => { + metrics::counter!("push.delivery", "result" => "ok").increment(1); debug!(task_id = %id, "Activation sent to worker"); if let Err(e) = store.mark_activation_processing(&id).await { + metrics::counter!("push.mark_activation_processing", "result" => "error").increment(1); + error!( task_id = %id, error = ?e, @@ -216,11 +234,16 @@ impl PushPool { } // Once processing deadline expires, status will be set back to pending - Err(e) => error!( - task_id = %id, - error = ?e, - "Failed to send activation to worker" - ), + Err(e) => { + metrics::counter!("push.delivery", "result" => "error") + .increment(1); + + error!( + task_id = %id, + error = ?e, + "Failed to send activation to worker" + ) + } }; } @@ -247,15 +270,27 @@ impl PushPool { /// Send an activation to the internal asynchronous MPMC channel used by all running push threads. Times out after `config.push_queue_timeout_ms` milliseconds. pub async fn submit(&self, activation: InflightActivation) -> Result<(), PushError> { let duration = Duration::from_millis(self.config.push_queue_timeout_ms); + let start = Instant::now(); + + metrics::gauge!("push.queue.depth").set(self.sender.len() as f64); match tokio::time::timeout(duration, self.sender.send_async(activation)).await { - Ok(Ok(())) => Ok(()), + Ok(Ok(())) => { + metrics::histogram!("push.queue.wait_duration").record(start.elapsed()); + Ok(()) + } // The channel has a problem - Ok(Err(e)) => Err(PushError::Channel(e)), + Ok(Err(e)) => { + metrics::histogram!("push.queue.wait_duration").record(start.elapsed()); + Err(PushError::Channel(e)) + } // The channel was full so the send timed out - Err(_elapsed) => Err(PushError::Timeout), + Err(_elapsed) => { + metrics::histogram!("push.queue.wait_duration").record(start.elapsed()); + Err(PushError::Timeout) + } } } } @@ -269,9 +304,16 @@ async fn push_task( grpc_shared_secret: &[String], ) -> Result<()> { let start = Instant::now(); + metrics::counter!("push.push_task.attempt").increment(1); // Try to decode activation (if it fails, we will see the error where `push_task` is called) - let task = TaskActivation::decode(&activation.activation as &[u8])?; + let task = match TaskActivation::decode(&activation.activation as &[u8]) { + Ok(task) => task, + Err(err) => { + metrics::histogram!("push.push_task.duration").record(start.elapsed()); + return Err(err.into()); + } + }; let request = PushTaskRequest { task: Some(task), diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 283461ba..77913711 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -502,10 +502,15 @@ impl InflightActivationStore for PostgresActivationStore { .await?; if result.rows_affected() == 0 { + metrics::counter!("push.mark_activation_processing", "result" => "not_found") + .increment(1); + warn!( task_id = %id, "Activation could not be marked as processing, it may be missing or its status may have already changed" ); + } else { + metrics::counter!("push.mark_activation_processing", "result" => "ok").increment(1); } Ok(()) diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index 8d043d27..f26ddb5c 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -614,10 +614,15 @@ impl InflightActivationStore for SqliteActivationStore { .await?; if result.rows_affected() == 0 { + metrics::counter!("push.mark_activation_processing", "result" => "not_found") + .increment(1); + warn!( task_id = %id, "Activation could not be marked as sent, it may be missing or its status may have already changed" ); + } else { + metrics::counter!("push.mark_activation_processing", "result" => "ok").increment(1); } Ok(())