feat(taskbroker): Implement retry support for raw topics#630
feat(taskbroker): Implement retry support for raw topics#630untitaker wants to merge 12 commits into
Conversation
Add retry support for raw/passthrough topics (e.g. `ingest-events`) where tasks don't have retry_state embedded in the message. Changes: - Config: Add `kafka_retry_topic` option for dedicated retry topic - Store: Add `update_retry_state` method to update activation's retry_state - gRPC: Handle `max_retries` in SetTaskStatus, call store.update_retry_state - Upkeep: Route retries to dedicated retry topic when configured - Consumer: Subscribe to both main and retry topics - Deserializer: Topic-aware routing (retry topic always uses activation deserializer) - Python client: Extract max_retries from Retry config, send in SetTaskStatusRequest When a worker reports RETRY status with max_retries, the broker updates the activation's retry_state and routes the retry to the dedicated retry topic. This prevents retries from polluting the main topic where other consumers (like SBC) can't parse activations. See https://www.notion.so/3448b10e4b5d80e7a1efee6145d504c2 → Stage 4 Depends on: getsentry/sentry-protos#251 ref STREAM-981 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
The sentry-protos stubs now include max_attempts in SetTaskStatusRequest. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
| } | ||
|
|
||
| if let Some(ref tx) = self.update_tx { | ||
| let max_attempts = request.get_ref().max_attempts; |
There was a problem hiding this comment.
I already asked in a different comment, but I think that code is now outdated so I'll ask here again. How often will there be a max_attempts field on the message? One out of every... 10? 100? 1000? If it's going to be present often, we'll need to rethink how batching works. Because batching is necessary to reach high throughput.
There was a problem hiding this comment.
This should be present always in tasks that run out of standard topics. I'd go further and say that maybe we should simplify the system by moving all tasks to specify the retries via the set_status method so we do not maintain more than one implementation.
Why does this affect the way batch is implemented ?
There was a problem hiding this comment.
The field will be there unconditionally on all retry status updates. So retries are effectively not batched. I can implement batching for retries but this would increase complexity.
Note that not every retry status update results in an additional DB query. max_attempts is only set in the DB if it wasn't there before.
This should be present always in tasks that run out of standard topics.
It's optional here for the sake of rollout. We can gradually increase the amount of tasks that send max_attempts through the worker and observe its impact on the broker. (the rollout mechanism isn't implemented here)
| &self, | ||
| id: &str, | ||
| status: InflightActivationStatus, | ||
| max_attempts: Option<u32>, |
There was a problem hiding this comment.
Adding this argument here sort of changes what set_status does. Before, it only set the status. Now it also updates retry_state. Most calls to set_status throughout the tests pass None because in most cases, it isn't needed. Perhaps there should be a separate method to handle this scenario? Like set_retry_status or set_retriable_status? Whatever name makes sense.
There was a problem hiding this comment.
It used to be a separate method, but in order to allow storage backends to optimize these queries I think we should pass it in a single method call.
Right now the postgres backend runs two queries, but one could imagine altering the internal schema so that setting the status and updating the max_attempts can be done in a single UPDATE. But the implementation already benefits from the fact that everything happens in set_status, because it can skip the second UPDATE for task activations that already have max_retries set.
| pub struct DeserializeConfig { | ||
| activation_config: DeserializeActivationConfig, | ||
| raw_config: Option<RawConfig>, | ||
| /// Retry topic always contains activations, even in raw_mode. |
There was a problem hiding this comment.
What does this comment mean? Why is the raw mode distinction important? I thought raw mode was the only mode in which we used the retry topic?
There was a problem hiding this comment.
I think I understand now. You mean that every message in the retry topic is guaranteed to be an activation even in raw mode, whereas messages in the "normal" topic in raw mode may not be?
There was a problem hiding this comment.
I think I understand now. You mean that every message in the retry topic is guaranteed to be an activation even in raw mode, whereas messages in the "normal" topic in raw mode may not be?
Every message in the retry topic is a TaskActivation protobuf regardless of whether the consumer is in raw mode or normal mode. This is because we need to store the retry count in the topic, somehow.
fpacifici
left a comment
There was a problem hiding this comment.
One high level comment
https://github.com/getsentry/taskbroker/pull/630/changes#r3262500919
| id=processing_result.task_id, | ||
| status=processing_result.status, | ||
| fetch_next_task=fetch_next_task, | ||
| max_attempts=processing_result.max_attempts, |
There was a problem hiding this comment.
Do we have an example of how the task will define this value ?
There was a problem hiding this comment.
nothing changes about how this value is defined. it's already public API: https://github.com/getsentry/sentry/blob/9ee21b63ae7bcd3bf9a002e077e7fc73b860c656/src/sentry/deletions/tasks/scheduled.py#L102-L103
| retry = task_func.retry | ||
| if retry and retry.should_retry(inflight.activation.retry_state, err): | ||
| next_state = TASK_ACTIVATION_STATUS_RETRY | ||
| max_attempts_val = retry._times + 1 |
There was a problem hiding this comment.
max_attempts shouldn't change. If we increase that value, the task will get an increasing number of retries and might not ever exhaust all retries.
There was a problem hiding this comment.
this is the constant retry.times from the public API: https://github.com/getsentry/sentry/blob/9ee21b63ae7bcd3bf9a002e077e7fc73b860c656/src/sentry/deletions/tasks/scheduled.py#L102-L103
retry._times is a constant, so +1 is a constant. we're not continuously incrementing.
| } | ||
|
|
||
| if let Some(ref tx) = self.update_tx { | ||
| let max_attempts = request.get_ref().max_attempts; |
There was a problem hiding this comment.
This should be present always in tasks that run out of standard topics. I'd go further and say that maybe we should simplify the system by moving all tasks to specify the retries via the set_status method so we do not maintain more than one implementation.
Why does this affect the way batch is implemented ?
| // Use batching channel if available and we don't need to update retry state. | ||
| // If max_attempts is Some, we can't use batching API to update the activation, and have to | ||
| // fall back to individual set_status. | ||
| if let Some(ref tx) = self.update_tx | ||
| && max_attempts.is_none() | ||
| { |
There was a problem hiding this comment.
Taking a step back on the retry topic.
If the retry topic contains activations rather than the original message, why do we even need a topic ? can't we just store the activation in the DB as pending and treat it like a task for the rest of its lifetime ?
I recognize this is a departure from the original intent of this PR, but it seems a lot simpler to me to manage it this way. The idea of the topic, to me, was meant to use it as a DLQ as well. Am I missing something ?
There was a problem hiding this comment.
can't we just store the activation in the DB as pending and treat it like a task for the rest of its lifetime ?
I am missing rationale for why the retry system was set up this way to begin with. Your suggestion also applies to how regular tasks work. I would not want to special-case raw-mode to handle retries fundamentally differently than regular tasks.
My guess is that we wanted to keep the size of a database under control, therefore pruning queued retries out of the DB and putting them back into Kafka. If we say that this is not really a concern with AlloyDB then that's fine, but we'd have to validate that IMO
I can explore this option in another PR, but not sure we should roll it out without having more context from folks who originally worked on taskbroker.
The idea of the topic, to me, was meant to use it as a DLQ as well
That is yet another topic. It can stay or go away regardless of what we decide wrt retries.
There was a problem hiding this comment.
@markstory @enochtangg do you remember why we didn't "just" stick retries into the DB and produce them back into kafka?
There was a problem hiding this comment.
Your suggestion also applies to how regular tasks work.
I thought we picked up the task from the database to do the retry. @george-sentry did we change something in the push model ?
There was a problem hiding this comment.
from slack:
@markstory: Keeping retries in sqlite/postgres could work but those retries will consume slots in sqlite/postgres. It will mean that a retry with no delay runs right away though instead of 'later' when it is found again in the topic.
@enochtangg: Another benefit I remember was because it resets the latency metric. Since latency is task dispatched - kafka receive latency, re-producing in kafka means we don't need to somehow fix that.
@fpacifici: the latency metric argument is important. I think we can keep it as it is and use the topic. It is true that in AlloyDB there will be more room, but we will have the sqlite around for a while. No need to make the change
So I think I won't change anything here.
…nstrument transaction start
- Sync sentry-protos version across Rust (Cargo.toml) and Python (pyproject.toml) - Update uv.lock to use sentry-protos 0.10.0 (was 0.8.13) - Merge main to fix rebalance integration test Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
| let max_attempts = request.get_ref().max_attempts; | ||
|
|
||
| // Use batching channel if available and we don't need to update retry state. | ||
| // If max_attempts is Some, we can't use batching API to update the activation, and have to | ||
| // fall back to individual set_status. | ||
| if let Some(ref tx) = self.update_tx | ||
| && max_attempts.is_none() | ||
| { | ||
| tx.send((id, status)) | ||
| .await | ||
| .map_err(|_| Status::internal("Status update channel closed"))?; |
There was a problem hiding this comment.
Bug: The Python worker unconditionally sends max_attempts for tasks with retry policies. This forces the gRPC server to bypass its batching optimization, causing an N+1 database query issue.
Severity: HIGH
Suggested Fix
Modify the Python worker to only send max_attempts when a task is actually being retried. For completed or failed tasks, max_attempts should not be sent, allowing the gRPC server's max_attempts.is_none() check to pass and utilize the intended batching optimization.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.
Location: src/grpc/server.rs#L106-L119
Potential issue: The Python worker unconditionally sends a `max_attempts` value for any
task with a retry decorator, regardless of its final status (complete, failure, or
retry). In the gRPC server, the batching channel (`update_tx`) is only used if
`max_attempts.is_none()`. Because the Python client always sends `max_attempts`, this
condition is never met for tasks with retry policies. This forces a fallback to an
individual `self.store.set_status()` call for each task, creating a significant
performance regression by introducing an N+1 database query problem instead of using a
single batched update.
… as main topic in raw mode
62df298 to
0bf9105
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 0bf9105. Configure here.
| task_func.retry._times + 1 | ||
| if task_func.retry and next_state == TASK_ACTIVATION_STATUS_RETRY | ||
| else None | ||
| ), |
There was a problem hiding this comment.
Raw retries lose delay
Medium Severity
On retry, the worker only sends max_attempts to the broker. For raw-mode activations, retry_state was absent in the stored blob; set_status then inserts a minimal state with only max_attempts. Upkeep republish uses delay_on_retry from that blob, so configured retry backoff is dropped and retries run immediately.
Reviewed by Cursor Bugbot for commit 0bf9105. Configure here.
There was a problem hiding this comment.
damn, i thought we had a nonzero default value for this. will probably add delay_on_retry to SetTaskStatusRequest
There was a problem hiding this comment.


Summary
Implement two independent features in this PR:
kafka_retry_topicso that one can send activations from retried tasks into a separate topic. That topic never contains raw messages.max_retriesfrom the worker. Previously the producer would configuremax_retriesand send them as part of the activation, but that cannot work with raw topics, so an architecture change is needed.See Architecture doc → Stage 4 for full context.
Dependencies
Depends on: getsentry/sentry-protos#251 (adds
max_retriesfield toSetTaskStatusRequest)ref STREAM-981