feat(upload): Retry objectstore requests#5836
Conversation
There was a problem hiding this comment.
I think in an ideal world you only need this:
- A wrapper which keeps track of whether a stream was polled or not. This is just another
Streamcombinator. - The wrapper allows recovery if it hasn't been polled yet.
- You pass a
&mut wrapper_streamto the client to transfer it. - The request fails, you attempt to recover from the wrapper via
wrapper_stream.reocver() -> Option<S> - If you get
None, the connection failed, if you getSomeyou can attempt to retry
There is just a small problem where the objectstore client requires BoxStream<'static, ..>, which I assume can't be changed because of reqwest requirements (?).
But you can can keep a similar API and functionality, you now need to employ interior mutability. Which leads to something similar what you have with the API surface. But instead of doing the Drop/oneshot dance I'd look into using a Mutex<Option<T>> (or fancier atomic based variant) and take()'ing the item on poll instead of trying to recover on Drop.
This avoids the oneshot and drop dance, which actually depends on the client dropping the stream where there is no guarantee it actually has to since you're giving full ownership.
My first iteration had an |
It's a bit awkward to deal with directly, but I think with the right amount of wrappers (maybe some kind of |
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| } | ||
| Objectstore::TraceAttachment(attachment) => { | ||
| self.handle_trace_attachment(attachment).await | ||
| let _ = self.handle_trace_attachment(attachment).await; |
There was a problem hiding this comment.
This is another case where the function ideally wouldn't even return anything. I didn't want to touch that function in this PR though.
| relay_statsd::metric!( | ||
| counter(RelayCounters::AttachmentUpload) += 1, | ||
| result = match &result { | ||
| Ok(_) => "success", | ||
| Err(e) => e.as_str(), | ||
| }, | ||
| type = "envelope", | ||
| ); |
There was a problem hiding this comment.
This metric is now logged further down the call stack, so it disappears in a few places.
| pub enum RetriableStream<S> { | ||
| /// The stream has not been polled. | ||
| /// Other owners of `S` can recover it by calling [`TakeOnce::take`]. | ||
| New(TakeOnce<S>), | ||
| /// The stream has been polled at least once and is no longer retriable. | ||
| /// | ||
| /// This state is an optimization such that the stream does not have to lock a mutex | ||
| /// on every poll. | ||
| Used(S), | ||
| } |
There was a problem hiding this comment.
The reason why I added a new type, rather than implementing Stream for TakeOnce<S: Stream>, is that there is no point in having a mutex after the first poll -> the internal state of the stream transitions to a simple wrapper.
| } | ||
|
|
||
| /// Takes the item, making it inaccessible for other owners. | ||
| pub fn take(&mut self) -> Option<T> { |
There was a problem hiding this comment.
Shouldn't the signature here not include the mut?:
| pub fn take(&mut self) -> Option<T> { | |
| pub fn take(&self) -> Option<T> { |
| retention_hours: Option<u16>, | ||
| ) -> Result<ObjectstoreKey, Error> { | ||
| tokio::time::timeout(self.timeout, async { | ||
| let mut result = Err(Error::Config("zero retries configured")); |
There was a problem hiding this comment.
NonZero* type in the config and make that state impossible?
|
|
||
| match &result { | ||
| Err(e) if e.is_retriable() => { | ||
| tokio::time::sleep(self.retry_interval).await; |
There was a problem hiding this comment.
We have a RetryBackoff type, maybe worth also using here.
There was a problem hiding this comment.
👍 , will consider it in case the simple loop doesn't work.
| | StatusCode::TOO_MANY_REQUESTS | ||
| | StatusCode::SERVICE_UNAVAILABLE | ||
| ) | ||
| ); |
There was a problem hiding this comment.
Retries 503 but PR specifies only 429, 502, 504
Low Severity
is_retriable includes StatusCode::SERVICE_UNAVAILABLE (503) in the set of retriable status codes, but the PR description explicitly states "Only 429, 502, 504 responses and connection errors are retried." While 503 is arguably a reasonable retriable status, it wasn't part of the documented intent and could lead to unexpected retry behavior for service-unavailable responses.
Reviewed by Cursor Bugbot for commit 4317a26. Configure here.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 2 total unresolved issues (including 1 from previous review).
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 5259894. Configure here.
| counter(RelayCounters::AttachmentUpload) += 1, | ||
| result = e.as_str(), |
There was a problem hiding this comment.
Bug: In handle_trace_attachment, a session creation failure causes an early return, preventing the AttachmentUpload metric from being emitted and silently dropping the error.
Severity: MEDIUM
Suggested Fix
Modify the error handling in handle_trace_attachment to ensure the AttachmentUpload metric is emitted when session creation fails. This can be done by explicitly handling the session creation error and emitting the metric before returning, similar to how it's handled in handle_envelope and handle_event_attachment.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.
Location: relay-server/src/services/objectstore.rs#L413-L414
Potential issue: When creating a session for a trace attachment fails within
`do_handle_trace_attachment`, the error is wrapped as `Error::UploadFailed` and the
function returns early. This prevents the `upload()` function from being called, which
is where the `AttachmentUpload` metric is supposed to be emitted. Consequently, session
creation failures for trace attachments are not reported, creating an observability gap.
This behavior is inconsistent with other handlers like `handle_envelope` and
`handle_event_attachment`, which correctly emit metrics in this scenario.
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>


If necessary, retry requests to objectstore.
fixes: INGEST-826