Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions clients/python/src/taskbroker_client/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ def PushTask(
context: grpc.ServicerContext,
) -> PushTaskResponse:
"""Handle incoming task activation."""
start_time = time.monotonic()
self.worker._metrics.incr(
"taskworker.worker.push_rpc",
tags={"result": "attempt", "processing_pool": self.worker._processing_pool_name},
)

# Create `InflightTaskActivation` from the pushed task
inflight = InflightTaskActivation(
activation=request.task,
Expand All @@ -69,8 +75,30 @@ def PushTask(

# Push the task to the worker queue (wait at most 5 seconds)
if not self.worker.push_task(inflight, timeout=5):
self.worker._metrics.incr(
"taskworker.worker.push_rpc",
tags={"result": "busy", "processing_pool": self.worker._processing_pool_name},
)

self.worker._metrics.distribution(
"taskworker.worker.push_rpc.duration",
time.monotonic() - start_time,
tags={"result": "busy", "processing_pool": self.worker._processing_pool_name},
)

context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, "worker busy")
Comment thread
george-sentry marked this conversation as resolved.

self.worker._metrics.incr(
"taskworker.worker.push_rpc",
tags={"result": "accepted", "processing_pool": self.worker._processing_pool_name},
)

self.worker._metrics.distribution(
"taskworker.worker.push_rpc.duration",
time.monotonic() - start_time,
tags={"result": "accepted", "processing_pool": self.worker._processing_pool_name},
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metrics after abort lack structural mutual exclusivity

Low Severity

The "accepted" metrics on lines 88–96 are emitted unconditionally after the if block, relying on context.abort() raising an exception to prevent them from firing in the busy case. While this works in production gRPC, the existing test test_push_task_worker_busy uses a MagicMock for context, where abort() does not raise — meaning both "busy" and "accepted" metrics fire for the same request during testing. Using an else branch (or adding a return after context.abort()) would make the mutual exclusivity between "busy" and "accepted" structurally guaranteed rather than reliant on a side effect.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit fbd2453. Configure here.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metrics being wrong when mocks are used is fine imo. The mocks should raise


Comment thread
george-sentry marked this conversation as resolved.
return PushTaskResponse()


Expand Down
45 changes: 32 additions & 13 deletions src/fetch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ pub fn bucket_range_for_fetch_thread(thread_index: usize, fetch_threads: usize)
/// Thin interface for the push pool. It mostly serves to enable proper unit testing, but it also decouples fetch logic from push logic even further.
#[async_trait]
pub trait TaskPusher {
/// Push a single task to the worker service.
async fn push_task(&self, activation: InflightActivation) -> Result<(), PushError>;
/// Submit a single task to the push pool.
async fn submit_task(&self, activation: InflightActivation) -> Result<(), PushError>;
}

#[async_trait]
impl TaskPusher for PushPool {
async fn push_task(&self, activation: InflightActivation) -> Result<(), PushError> {
async fn submit_task(&self, activation: InflightActivation) -> Result<(), PushError> {
self.submit(activation).await
}
}
Expand Down Expand Up @@ -123,41 +123,60 @@ impl<T: TaskPusher + Send + Sync + 'static> FetchPool<T> {
.await
{
Ok(activations) if activations.is_empty() => {
metrics::counter!("fetch.empty").increment(1);
debug!("No pending activations");

// Wait for pending activations to appear
backoff = true;
}

Ok(activations) => {
metrics::counter!("fetch.claimed")
.increment(activations.len() as u64);
metrics::histogram!("fetch.claim_batch_size")
.record(activations.len() as f64);

debug!("Fetched {} activations", activations.len());

for activation in activations {
let id = activation.id.clone();

if let Err(e) = pusher.push_task(activation).await {
match e {
PushError::Timeout => warn!(
match pusher.submit_task(activation).await {
Ok(()) => metrics::counter!("fetch.submit", "result" => "ok").increment(1),

Err(PushError::Timeout) => {
metrics::counter!("fetch.submit", "result" => "timeout")
.increment(1);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the match block on 153 be indented?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, fixed. I think cargofmt didn't do anything because it was already indented too far.


warn!(
task_id = %id,
"Submit to push pool timed out after {} milliseconds",
config.push_queue_timeout_ms
),
);

// Wait for push queue to empty
backoff = true;
}

PushError::Channel(e) => warn!(
Err(PushError::Channel(e)) => {
metrics::counter!("fetch.submit", "result" => "channel_error")
.increment(1);

warn!(
task_id = %id,
error = ?e,
"Submit to push pool failed due to channel error",
)
}
);
Comment thread
cursor[bot] marked this conversation as resolved.

backoff = true;
// Wait before trying again
backoff = true;
}
}
}


}
Comment thread
sentry[bot] marked this conversation as resolved.

Err(e) => {
metrics::counter!("fetch.store_error").increment(1);
warn!(
error = ?e,
"Store failed while fetching tasks"
Expand Down
2 changes: 1 addition & 1 deletion src/fetch/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl RecordingPusher {

#[async_trait]
impl TaskPusher for RecordingPusher {
async fn push_task(&self, activation: InflightActivation) -> Result<(), PushError> {
async fn submit_task(&self, activation: InflightActivation) -> Result<(), PushError> {
self.pushed_ids.lock().await.push(activation.id.clone());

if self.fail {
Expand Down
47 changes: 36 additions & 11 deletions src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,43 @@ impl ConsumerService for TaskbrokerServer {
metrics::counter!("grpc_server.set_status.failure").increment(1);
}

let update_result = self.store.set_status(&id, status).await;
if let Err(e) = update_result {
error!(
?id,
?status,
"Unable to update status of activation: {:?}",
e,
);
return Err(Status::internal(format!(
"Unable to update status of {id:?} to {status:?}"
)));
match self.store.set_status(&id, status).await {
Ok(Some(_)) => metrics::counter!(
"grpc_server.set_status",
"result" => "ok",
"status" => status.to_string()
)
.increment(1),

Ok(None) => metrics::counter!(
"grpc_server.set_status",
"result" => "not_found",
"status" => status.to_string()
)
.increment(1),

Err(e) => {
metrics::counter!(
"grpc_server.set_status",
"result" => "error",
"status" => status.to_string()
)
.increment(1);

error!(
?id,
?status,
"Unable to update status of activation: {:?}",
e,
);

metrics::histogram!("grpc_server.set_status.duration").record(start_time.elapsed());
return Err(Status::internal(format!(
"Unable to update status of {id:?} to {status:?}"
Comment thread
sentry[bot] marked this conversation as resolved.
)));
}
}

metrics::histogram!("grpc_server.set_status.duration").record(start_time.elapsed());

if self.config.delivery_mode == DeliveryMode::Push {
Expand Down
2 changes: 1 addition & 1 deletion src/kafka/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ mod tests {
self.pipe
.write()
.unwrap()
.extend(take(&mut self.buffer.write().unwrap() as &mut Vec<T>).into_iter());
.extend(take(&mut self.buffer.write().unwrap() as &mut Vec<T>));
Ok(Some(()))
}

Expand Down
78 changes: 60 additions & 18 deletions src/push/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ pub enum PushError {
#[async_trait]
trait WorkerClient {
/// Send a single `PushTaskRequest` to the worker service.
///
/// When `grpc_shared_secret` is non-empty, signs with `grpc_shared_secret[0]` and sets `sentry-signature` metadata (same scheme as Python pull client and broker `AuthLayer`).
/// When `grpc_shared_secret` is not empty, it signs the request with `grpc_shared_secret[0]` and sets `sentry-signature` metadata (same scheme as Python pull client and broker `AuthLayer`).
async fn send(&mut self, request: PushTaskRequest, grpc_shared_secret: &[String])
-> Result<()>;
}
Expand All @@ -75,6 +74,7 @@ impl WorkerClient for WorkerServiceClient<Channel> {
self.push_task(req)
.await
.map_err(|status| anyhow::anyhow!(status))?;

Ok(())
}
}
Expand Down Expand Up @@ -128,9 +128,17 @@ impl PushPool {
let grpc_shared_secret = self.config.grpc_shared_secret.clone();

async move {
metrics::counter!("push.worker.connect.attempt").increment(1);

let mut worker = match WorkerServiceClient::connect(endpoint).await {
Ok(w) => w,
Ok(w) => {
metrics::counter!("push.worker.connect", "result" => "ok").increment(1);
w
}

Err(e) => {
metrics::counter!("push.worker.connect", "result" => "error")
.increment(1);
error!("Failed to connect to worker - {:?}", e);

// When we fail to connect, the taskbroker will shut down, but this may change in the future
Expand Down Expand Up @@ -167,9 +175,12 @@ impl PushPool {
.await
{
Ok(_) => {
metrics::counter!("push.delivery", "result" => "ok").increment(1);
debug!(task_id = %id, "Activation sent to worker");

if let Err(e) = store.mark_activation_processing(&id).await {
metrics::counter!("push.mark_activation_processing", "result" => "error").increment(1);

error!(
task_id = %id,
error = ?e,
Expand All @@ -178,12 +189,16 @@ impl PushPool {
}
}

// Once processing deadline expires, status will be set back to pending
Err(e) => error!(
task_id = %id,
error = ?e,
"Failed to send activation to worker"
)
// Once claim expires, status will be set back to pending
Err(e) => {
metrics::counter!("push.delivery", "result" => "error").increment(1);

error!(
task_id = %id,
error = ?e,
"Failed to send activation to worker"
)
}
};
}
}
Expand All @@ -204,9 +219,12 @@ impl PushPool {
.await
{
Ok(_) => {
metrics::counter!("push.delivery", "result" => "ok").increment(1);
debug!(task_id = %id, "Activation sent to worker");

if let Err(e) = store.mark_activation_processing(&id).await {
metrics::counter!("push.mark_activation_processing", "result" => "error").increment(1);

error!(
task_id = %id,
error = ?e,
Expand All @@ -216,11 +234,16 @@ impl PushPool {
}

// Once processing deadline expires, status will be set back to pending
Err(e) => error!(
task_id = %id,
error = ?e,
"Failed to send activation to worker"
),
Err(e) => {
metrics::counter!("push.delivery", "result" => "error")
.increment(1);

error!(
task_id = %id,
error = ?e,
"Failed to send activation to worker"
)
}
};
}

Expand All @@ -247,15 +270,27 @@ impl PushPool {
/// Send an activation to the internal asynchronous MPMC channel used by all running push threads. Times out after `config.push_queue_timeout_ms` milliseconds.
pub async fn submit(&self, activation: InflightActivation) -> Result<(), PushError> {
let duration = Duration::from_millis(self.config.push_queue_timeout_ms);
let start = Instant::now();

metrics::gauge!("push.queue.depth").set(self.sender.len() as f64);

match tokio::time::timeout(duration, self.sender.send_async(activation)).await {
Ok(Ok(())) => Ok(()),
Ok(Ok(())) => {
metrics::histogram!("push.queue.wait_duration").record(start.elapsed());
Ok(())
}

// The channel has a problem
Ok(Err(e)) => Err(PushError::Channel(e)),
Ok(Err(e)) => {
metrics::histogram!("push.queue.wait_duration").record(start.elapsed());
Err(PushError::Channel(e))
}

// The channel was full so the send timed out
Err(_elapsed) => Err(PushError::Timeout),
Err(_elapsed) => {
metrics::histogram!("push.queue.wait_duration").record(start.elapsed());
Err(PushError::Timeout)
}
}
}
}
Expand All @@ -269,9 +304,16 @@ async fn push_task<W: WorkerClient + Send>(
grpc_shared_secret: &[String],
) -> Result<()> {
let start = Instant::now();
metrics::counter!("push.push_task.attempt").increment(1);

// Try to decode activation (if it fails, we will see the error where `push_task` is called)
let task = TaskActivation::decode(&activation.activation as &[u8])?;
let task = match TaskActivation::decode(&activation.activation as &[u8]) {
Ok(task) => task,
Err(err) => {
metrics::histogram!("push.push_task.duration").record(start.elapsed());
return Err(err.into());
}
};

let request = PushTaskRequest {
task: Some(task),
Expand Down
5 changes: 5 additions & 0 deletions src/store/adapters/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,10 +502,15 @@ impl InflightActivationStore for PostgresActivationStore {
.await?;

if result.rows_affected() == 0 {
metrics::counter!("push.mark_activation_processing", "result" => "not_found")
.increment(1);

warn!(
task_id = %id,
"Activation could not be marked as processing, it may be missing or its status may have already changed"
);
} else {
metrics::counter!("push.mark_activation_processing", "result" => "ok").increment(1);
}

Ok(())
Expand Down
5 changes: 5 additions & 0 deletions src/store/adapters/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -614,10 +614,15 @@ impl InflightActivationStore for SqliteActivationStore {
.await?;

if result.rows_affected() == 0 {
metrics::counter!("push.mark_activation_processing", "result" => "not_found")
.increment(1);

warn!(
task_id = %id,
"Activation could not be marked as sent, it may be missing or its status may have already changed"
);
} else {
metrics::counter!("push.mark_activation_processing", "result" => "ok").increment(1);
}

Ok(())
Expand Down
Loading