Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## Unreleased

**Features**:

- Add metadata support for the `/upload` endpoint. ([#6028](https://github.com/getsentry/relay/pull/6028))

## 26.5.2

**Features**:
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ where
.send(Create {
scoping,
length: None,
attachment_type: item.attachment_type(),
})
.await
.ok()?
Expand Down
47 changes: 37 additions & 10 deletions relay-server/src/endpoints/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,13 @@ async fn handle_post(
check_kill_switch(&state)?;

relay_log::trace!("Validating headers");
let upload_length = tus::validate_post_headers(&headers).map_err(Error::from)?;
let headers = tus::validate_post_headers(&headers).map_err(Error::from)?;
let config = state.config();

if upload_length.is_some_and(|len| len > config.max_upload_size()) {
if headers
.upload_length
.is_some_and(|len| len > config.max_upload_size())
{
return Err(StatusCode::PAYLOAD_TOO_LARGE.into());
}

Expand All @@ -174,11 +177,11 @@ async fn handle_post(
})?;

relay_log::trace!("Checking request");
let scoping = check_request(&state, meta, upload_length, project).await?;
let scoping = validate_and_limit(&state, meta, &headers, project).await?;

// Unconditionally create the upload location:
relay_log::trace!("Creating upload location");
let result = create(&state, scoping, upload_length).await;
let result = create(&state, scoping, &headers).await;
let location = result.inspect_err(|e| {
relay_log::warn!(error = e as &dyn std::error::Error, "create failed");
})?;
Expand Down Expand Up @@ -221,7 +224,7 @@ async fn handle_patch(
})?;

relay_log::trace!("Checking request");
let scoping = check_request(&state, meta, length.value(), project).await?;
let scoping = validate(&state, meta, project).await?;

let stream = body
.into_data_stream()
Expand Down Expand Up @@ -288,13 +291,14 @@ fn check_kill_switch(state: &ServiceState) -> Result<(), StatusCode> {
async fn create(
state: &ServiceState,
scoping: Scoping,
upload_length: Option<usize>,
headers: &tus::Headers,
Comment thread
tobias-wilfert marked this conversation as resolved.
) -> Result<SignedLocation<Provisional>, Error> {
let location = state
.upload()
.send(upload::Create {
scoping,
length: upload_length,
length: headers.upload_length,
attachment_type: headers.metadata.map(|m| m.attachment_type),
})
.await??;

Expand Down Expand Up @@ -324,17 +328,20 @@ async fn upload(
///
/// This is currently the easiest way to guarantee that the upload gets checked the same way as
/// the envelope.
async fn check_request(
async fn validate_and_limit(
state: &ServiceState,
meta: RequestMeta,
upload_length: Option<usize>,
headers: &tus::Headers,
project: Project<'_>,
) -> Result<Scoping, BadStoreRequest> {
let mut envelope = Envelope::from_request(None, meta);
envelope.require_feature(Feature::UploadEndpoint);
let mut item = Item::new(ItemType::Attachment);
item.set_payload(ContentType::AttachmentRef, vec![]);
item.set_attachment_length(upload_length.unwrap_or(1));
item.set_attachment_length(headers.upload_length.unwrap_or(1));
if let Some(ref metadata) = headers.metadata {
item.set_attachment_type(metadata.attachment_type);
}
envelope.add_item(item);
let mut envelope = Managed::from_envelope(envelope, state.outcome_aggregator().clone());
let rate_limits = project
Expand All @@ -353,6 +360,26 @@ async fn check_request(
Ok(scoping)
}

async fn validate(
state: &ServiceState,
meta: RequestMeta,
project: Project<'_>,
) -> Result<Scoping, BadStoreRequest> {
let mut envelope = Envelope::from_request(None, meta);
envelope.require_feature(Feature::UploadEndpoint);
let mut envelope = Managed::from_envelope(envelope, state.outcome_aggregator().clone());

let _ = project
.check_envelope(&mut envelope)
.await
.map_err(|err| err.map(BadStoreRequest::EventRejected).into_inner())?;

// We are not really processing an envelope here, only keep the updated scoping:
let scoping = envelope.scoping();
envelope.accept(|x| x);
Ok(scoping)
}

fn is_hyper_user_error(error: &(dyn std::error::Error + 'static)) -> bool {
error
.downcast_ref::<hyper::Error>()
Expand Down
4 changes: 4 additions & 0 deletions relay-server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use std::io;
use std::time::Duration;

use http::header::InvalidHeaderValue;
use relay_config::HttpEncoding;
pub use reqwest::StatusCode;
use reqwest::header::{HeaderMap, HeaderValue};
Expand All @@ -28,6 +29,8 @@ pub enum HttpError {
Json(#[from] serde_json::Error),
#[error("request was retried or not initialized")]
Misconfigured,
#[error("failed to construct header")]
Header(#[from] InvalidHeaderValue),
}

impl HttpError {
Expand All @@ -41,6 +44,7 @@ impl HttpError {
Self::Json(_) => false,
Self::Overflow => false,
Self::Misconfigured => false,
Self::Header(_) => false,
}
}
}
Expand Down
72 changes: 40 additions & 32 deletions relay-server/src/services/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use tokio_util::io::{ReaderStream, StreamReader};
#[cfg(feature = "processing")]
use uuid::Uuid;

use crate::envelope::AttachmentType;
use crate::http::{HttpError, RequestBuilder, Response};

#[cfg(feature = "processing")]
Expand Down Expand Up @@ -114,6 +115,8 @@ pub struct Create {
///
/// Trusted clients (i.e. PoP Relays) are allowed to omit the length (see `Upload-Defer-Length: 1`).
pub length: Option<usize>,
/// The attachment type of the upload.
pub attachment_type: Option<AttachmentType>,
}

/// The type used to stream a request body.
Expand Down Expand Up @@ -241,11 +244,15 @@ enum Backend {
impl Service {
async fn create(
&self,
Create { scoping, length }: Create,
Create {
scoping,
length,
attachment_type,
}: Create,
) -> Result<SignedLocation<Provisional>, Error> {
match &self.backend {
Backend::Upstream { addr } => {
let (request, rx) = UploadRequest::create(scoping, length);
let (request, rx) = UploadRequest::create(scoping, length, attachment_type);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directly related to this PR, but here we are destructuring the Create parameter in the function signature only to pass the parts individually to UploadRequest::create. Instead, we could use Create as the parameter for that function as well.

addr.send(SendRequest(request));
let response = rx.await??;
SignedLocation::try_from_response(response)
Expand Down Expand Up @@ -354,7 +361,7 @@ pub trait UploadLength: for<'de> Deserialize<'de> {

/// A provisional upload length which may or may not yet be known.
///
/// /// See also [`Final`].
/// See also [`Final`].
#[derive(Debug, Clone, Copy, Deserialize)]
#[serde(transparent)]
pub struct Provisional(Option<usize>);
Expand Down Expand Up @@ -561,11 +568,11 @@ where
enum RequestKind {
Create {
length: Option<usize>,
attachment_type: Option<AttachmentType>,
},
Upload {
location: SignedLocation<Provisional>,
stream: TakeOnce<BoundedStream<MeteredStream<ByteStream>>>,
length: Option<usize>,
encoding: HttpEncoding,
},
}
Expand All @@ -581,6 +588,7 @@ impl UploadRequest {
fn create(
scoping: Scoping,
length: Option<usize>,
attachment_type: Option<AttachmentType>,
) -> (
Self,
oneshot::Receiver<Result<Response, UpstreamRequestError>>,
Expand All @@ -590,7 +598,10 @@ impl UploadRequest {
(
Self {
scoping,
kind: RequestKind::Create { length },
kind: RequestKind::Create {
length,
attachment_type,
},
sender,
},
rx,
Expand All @@ -610,29 +621,20 @@ impl UploadRequest {
location,
stream,
} = stream;
let length = stream.length();

(
Self {
scoping,
kind: RequestKind::Upload {
location,
stream: TakeOnce::new(stream),
length,
encoding: HttpEncoding::Zstd, // just a default, will be overwritten by .configure()
},
sender,
},
rx,
)
}

/// Returns the length of the upload, if known.
fn length(&self) -> Option<usize> {
match &self.kind {
RequestKind::Create { length } | RequestKind::Upload { length, .. } => *length,
}
}
}

impl fmt::Debug for UploadRequest {
Expand Down Expand Up @@ -690,24 +692,30 @@ impl UpstreamRequest for UploadRequest {
}

fn build(&mut self, builder: &mut RequestBuilder) -> Result<(), HttpError> {
let upload_length = self.length();
if let RequestKind::Upload {
stream, encoding, ..
} = &mut self.kind
{
let Some(body) = RetryableStream::new(stream.clone()) else {
relay_log::error!("upload request stream was already consumed");
return Err(HttpError::Misconfigured);
};
tus::add_upload_headers(builder);

let body = encode_body(body, *encoding);
builder.content_encoding(*encoding);

builder.body(reqwest::Body::wrap_stream(body));
} else {
tus::add_creation_headers(upload_length, builder);
}
match &mut self.kind {
RequestKind::Create {
length,
attachment_type,
} => {
tus::add_creation_headers(*length, *attachment_type, builder)?;
}
RequestKind::Upload {
location: _,
stream,
encoding,
} => {
let Some(body) = RetryableStream::new(stream.clone()) else {
relay_log::error!("upload request stream was already consumed");
return Err(HttpError::Misconfigured);
};
tus::add_upload_headers(builder);

let body = encode_body(body, *encoding);
builder.content_encoding(*encoding);

builder.body(reqwest::Body::wrap_stream(body));
}
};

let project_key = self.scoping.project_key;
builder.header("X-Sentry-Auth", format!("Sentry sentry_key={project_key}"));
Expand Down
2 changes: 2 additions & 0 deletions relay-server/src/services/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ impl UpstreamRequestError {
UpstreamRequestError::Http(HttpError::Reqwest(_)) => "reqwest_error",
UpstreamRequestError::Http(HttpError::Overflow) => "overflow",
UpstreamRequestError::Http(HttpError::Misconfigured) => "misconfigured",
UpstreamRequestError::Http(HttpError::Header(_)) => "header",
UpstreamRequestError::RateLimited(_) => "rate_limited",
UpstreamRequestError::ResponseError(_, _) => "response_error",
UpstreamRequestError::ChannelClosed => "channel_closed",
Expand All @@ -218,6 +219,7 @@ impl IntoResponse for UpstreamRequestError {
HttpError::Io(_) => StatusCode::BAD_GATEWAY.into_response(),
HttpError::Json(_) => StatusCode::BAD_REQUEST.into_response(),
HttpError::Misconfigured => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
HttpError::Header(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
},
Self::SendFailed(e) => {
if find_error_source(&e, is_length_limit_error).is_some() {
Expand Down
Loading
Loading