feat(taskbroker): Add Claimed Status to Handle Push Failures#586
feat(taskbroker): Add Claimed Status to Handle Push Failures#586george-sentry merged 18 commits intomainfrom
Conversation
…tivations Marked Sent
| /// Unused but necessary to align with sentry-protos | ||
| Unspecified, | ||
| Pending, | ||
| Sending, |
There was a problem hiding this comment.
What if we had a Claimed status? The lifecycle could be
flowchart
pending -- taken by a push thread and going to send soon --> claimed
claimed -- sent to a worker successfully --> processing
processing --> complete
processing --> retry
processing --> failure
You have 'claim' in a bunch of the methods, but no status reflecting that, and with the addition of Sending the Processing status can mean two different things depending on the broker mode.
There was a problem hiding this comment.
This is functionally the same as before, right? Changing sending to claimed?
There was a problem hiding this comment.
It sounds like it is. I don't mind changing sending to claimed if it's more clear that way.
There was a problem hiding this comment.
Yeah, it is just a naming change.
| status: InflightActivationStatus, | ||
| ) -> Result<Vec<InflightActivation>, Error>; | ||
|
|
||
| /// Claims `limit` activations within the `bucket` range. Push mode uses status `Sending` until `mark_activation_sent` moves to `Processing`. |
There was a problem hiding this comment.
With the methods using claim in their name, Claimed could be a better status name.
There was a problem hiding this comment.
I think this is a good idea. Thoughts @evanh?
There was a problem hiding this comment.
Yeah I think it makes sense. Cleans up the nomenclature for sure.
| } | ||
|
|
||
| #[instrument(skip_all)] | ||
| async fn mark_activation_sent(&self, id: &str) -> Result<(), Error> { |
There was a problem hiding this comment.
Sent isn't a status/state in the state machine, should this be mark_activation_processing?
| if let Ok(query_res) = most_once_result { | ||
| processing_deadline_modified_rows = query_res.rows_affected(); | ||
| } | ||
| // Revert activations that weren't delivered back to 'pending' without consuming an attempt |
There was a problem hiding this comment.
In other tasks systems this is referred to as releasing a claim
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 2 total unresolved issues (including 1 from previous review).
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: SQLite
unixepochuses unsupported 'milliseconds' modifier- Replaced the invalid SQLite
millisecondsmodifier with a seconds expression using fractional seconds soclaim_expires_atis now populated correctly in push claims.
- Replaced the invalid SQLite
Or push these changes by commenting:
@cursor push 4d21eabfe0
Preview (4d21eabfe0)
diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs
--- a/src/store/inflight_activation.rs
+++ b/src/store/inflight_activation.rs
@@ -930,7 +930,7 @@
query_builder.push_bind(InflightActivationStatus::Processing);
} else {
query_builder.push(format!(
- "claim_expires_at = unixepoch('now', '+' || {claim_lease_ms} || ' milliseconds', '+' || {grace_period} || ' seconds'), processing_deadline = NULL, status = "
+ "claim_expires_at = unixepoch('now', '+' || ({claim_lease_ms} / 1000.0) || ' seconds', '+' || {grace_period} || ' seconds'), processing_deadline = NULL, status = "
));
query_builder.push_bind(InflightActivationStatus::Claimed);This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.
markstory
left a comment
There was a problem hiding this comment.
Looks good to me, outside of the at-most-once task handling. While I don't think what you have is wrong, it does run the risk of dropping tasks without ever executing them.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
There are 3 total unresolved issues (including 1 from previous review).
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 2156a43. Configure here.
| max_processing_attempts: config.max_processing_attempts, | ||
| vacuum_page_count: config.vacuum_page_count, | ||
| processing_deadline_grace_sec: config.processing_deadline_grace_sec, | ||
| claim_lease_ms: config.fetch_batch_size.max(1) as u64 * config.push_queue_timeout_ms, |
There was a problem hiding this comment.
Claim lease formula omits push timeout causing premature expiration
Medium Severity
The claim_lease_ms formula is fetch_batch_size * push_queue_timeout_ms, which only covers the time to enqueue tasks into the push pool's bounded channel. After enqueuing, the actual gRPC push can take up to push_timeout_ms (default 30s). With defaults, claims expire after ~8s (5s lease + 3s grace), but pushes can legitimately take up to 30s. If a push takes longer than 8s but succeeds, upkeep reverts the task to Pending before mark_activation_processing runs, causing the successfully-delivered task to be re-claimed and pushed again — duplicate execution.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 2156a43. Configure here.
There was a problem hiding this comment.
This is possibly a valid point. It may be a better idea to just make this value configurable, but then we'll need to pick an appropriate value, which is hard. Let's remember this for the future.
| .expect("Could not create kafka producer in upkeep"), | ||
| ); | ||
| if let Ok(tasks) = store | ||
| .get_pending_activations_from_namespaces(None, Some(&demoted_namespaces), None, None) | ||
| .claim_activations(None, Some(&demoted_namespaces), None, None, false) | ||
| .await | ||
| { | ||
| // Produce tasks to Kafka with updated namespace |
There was a problem hiding this comment.
Bug: Tasks for demoted namespaces that fail Kafka publishing get stuck in an infinite retry loop because processing_attempts is never incremented, preventing them from reaching Failure status.
Severity: HIGH
Suggested Fix
To fix this, ensure the processing_attempts counter is incremented for this failure path. This can be done by either using mark_processing: true for demoted namespaces, calling mark_activation_processing after a successful publish, or incrementing the counter when reverting a task from Claimed to Pending upon expiration.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: src/upkeep.rs#L318-L324
Potential issue: When forwarding tasks for demoted namespaces, the system first moves
them to a `Claimed` status. If the subsequent Kafka publish fails, these tasks remain
`Claimed` until they expire. The expiration handler, `handle_claim_expiration()`,
reverts them to `Pending` but does not increment the `processing_attempts` counter. This
creates an infinite loop where tasks that consistently fail to publish are retried
indefinitely, never reaching the `max_processing_attempts` limit and never being moved
to the `Failure` status for dead-lettering.
There was a problem hiding this comment.
The AI reviewers constantly get tripped up by this code. If I mark them as claimed, it will complain about failed publishing resulting in infinite retries. If I mark them as processing, it will complain that we will run out of tries after the processing deadline is exceeded too many times. In other words, no matter which way you go, this chunk of code will be flagged.



Linear
Completes STREAM-860
Description
Currently, taskworkers pull tasks from taskbrokers via RPC. This approach works, but has some drawbacks. Therefore, we want taskbrokers to push tasks to taskworkers instead. Read this page on Notion for more information.
Right now, I rely on processing_deadline to revert processing tasks back to pending if pushing them failed. This isn't good because it eats through processing attempts, resulting in needlessly dropped tasks.
I want to add a
Sendingstatus that indicates a task is being sent. Now, upkeep increments processing attempts only for tasks that are still in "sending" when their processing deadlines expire. If the status is "processing," that means the task was already sent successfully and its processing attempts can be incremented.This will help us avoid dropping tasks needlessly when workers are busy.
Note that my original plan was different. You can see it in the commit history. Here is a description of that plan.