-
Notifications
You must be signed in to change notification settings - Fork 108
ref(server): Streaming forward requests #5624
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| UpstreamRequestError::Http(HttpError::Json(_)) => "invalid_json", | ||
| UpstreamRequestError::Http(HttpError::Reqwest(_)) => "reqwest_error", | ||
| UpstreamRequestError::Http(HttpError::Overflow) => "overflow", | ||
| UpstreamRequestError::Http(HttpError::Misconfigured) => "misconfigured", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally the type system would disallow misconfiguring a request, but that would be a much bigger change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe worth a relay_log::error! if that doesn't exist already?
| let this = self.get_mut(); | ||
| let poll = Pin::new(this.body.get_mut()).poll_frame(cx); | ||
| let inner = this.body.get_mut(); | ||
| this.size_hint = inner.size_hint(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
size_hint presumably does not change between polls, but I considered it good measure to update the entire state on every mutable access.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah either seems fine (I'd guess getting the size hint is cheap), just important to update the is_end_stream.
| "/api/42/store/", | ||
| "/api/42/envelope/", | ||
| "/api/42/attachment/", | ||
| "/api/666/envelope/", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WIth streaming enabled, /attachment/ would immediately respond with 404, so I changed this to an existing endpoint in mini_sentry.
relay-server/src/utils/forward.rs
Outdated
| builder.body(self.body.clone()); | ||
| let body = self.body.take().ok_or(HttpError::Misconfigured)?; | ||
|
|
||
| let new_body = reqwest::Body::wrap(SyncBody::new(body)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot plug in the axum body here because it is not Sync, and we cannot plug in SyncWrapper because it does not implement http_body::Body. We could plug in Body::into_data_stream but that would lose the size hint, resulting in the outgoing transfer-encoding always being chunked.
| # Store endpoints theoretically support chunked transfer encoding, | ||
| # but for now, we're conservative and don't allow that anywhere. | ||
| if flask_request.headers.get("transfer-encoding"): | ||
| if not sentry.allow_chunked and flask_request.headers.get("transfer-encoding"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense to deny chunked encoding for most tests. However, some tests just use mini_sentry as an echo server. For those, chunked encoding should be allowed.
| symbolic-common = { version = "12.12.3", default-features = false } | ||
| symbolic-unreal = { version = "12.12.3", default-features = false } | ||
| syn = { version = "2.0.106" } | ||
| sync_wrapper = { version = "1.0.2" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was already in our dependency tree.
| UpstreamRequestError::Http(HttpError::Json(_)) => "invalid_json", | ||
| UpstreamRequestError::Http(HttpError::Reqwest(_)) => "reqwest_error", | ||
| UpstreamRequestError::Http(HttpError::Overflow) => "overflow", | ||
| UpstreamRequestError::Http(HttpError::Misconfigured) => "misconfigured", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe worth a relay_log::error! if that doesn't exist already?
| let this = self.get_mut(); | ||
| let poll = Pin::new(this.body.get_mut()).poll_frame(cx); | ||
| let inner = this.body.get_mut(); | ||
| this.size_hint = inner.size_hint(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah either seems fine (I'd guess getting the size hint is cheap), just important to update the is_end_stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.
| .into_response(), | ||
| HttpError::Io(_) => StatusCode::BAD_GATEWAY.into_response(), | ||
| HttpError::Json(_) => StatusCode::BAD_REQUEST.into_response(), | ||
| HttpError::Misconfigured => StatusCode::INTERNAL_SERVER_ERROR.into_response(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Streaming body limit errors return 502 instead of 413
Medium Severity
When the streamed body exceeds the RequestBodyLimitLayer limit during forwarding, the LengthLimitError from http_body_util::Limited propagates as a reqwest::Error, which maps to UpstreamRequestError::SendFailed and ultimately returns 502 Bad Gateway. Previously, with DefaultBodyLimit + Bytes extraction, exceeding the limit returned 413 Payload Too Large before the handler even ran. This regression particularly affects compressed requests, where RequestDecompressionLayer decompresses the body before RequestBodyLimitLayer sees it — the original Content-Length (compressed size) is no longer accurate, so the upfront check passes but the decompressed stream exceeds the limit mid-flight.


When outdated external Relays get a request to an unknown endpoint, they should forward it to our internal Relays for compatibility. However, they should not be forced to buffer the entire request in-memory, especially not for the upcoming large file uploads.
This PR changes the forward endpoint to stream the request upstream without buffering the full body, and lays the foundation for other streaming endpoints.
Fixes INGEST-679.