Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(server): Accept and forward unknown envelope items #1246

Merged
merged 7 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
**Bug Fixes**:

- fix(metrics): Enforce metric name length limit. ([#1238](https://github.com/getsentry/relay/pull/1238))
- Accept and forward unknown Envelope items. In processing mode, drop items individually rather than rejecting the entire request. This allows SDKs to send new data in combined Envelopes in the future. ([#1246](https://github.com/getsentry/relay/pull/1246))

**Internal**:

Expand Down
24 changes: 24 additions & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,22 @@ impl Default for Limits {
}
}

/// Controls traffic steering.
#[derive(Debug, Default, Deserialize, Serialize)]
#[serde(default)]
pub struct Routing {
/// Accept and forward unknown Envelope items to the upstream.
///
/// Forwarding unknown items should be enabled in most cases to allow proxying traffic for newer
/// SDK versions. The upstream in Sentry makes the final decision on which items are valid. If
/// this is disabled, just the unknown items are removed from Envelopes, and the rest is
/// processed as usual.
///
/// Defaults to `true` for all Relay modes other than processing mode. In processing mode, this
/// is disabled by default since the item cannot be handled.
unknown_items: Option<bool>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we call this accept_unknown_items?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally had it as forward_unknown_items, but it seemed rather long. Subjectively, the combined key of routing.unknown_items: false is clear to me, but I'm happy to update to a more descriptive name if you prefer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the same as joris but I get the reasoning from jan as well, so i don't have an opinion on this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it would make sense to make it consistent with the function name, but I did not consider the routing. prefix. With the prefix I think it's clear enough.

}

/// Http content encoding for both incoming and outgoing web requests.
#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
Expand Down Expand Up @@ -1224,6 +1240,8 @@ struct ConfigValues {
#[serde(default)]
logging: relay_log::LogConfig,
#[serde(default)]
routing: Routing,
#[serde(default)]
metrics: Metrics,
#[serde(default)]
sentry: relay_log::SentryConfig,
Expand Down Expand Up @@ -1940,6 +1958,12 @@ impl Config {
pub fn static_relays(&self) -> &HashMap<RelayId, RelayInfo> {
&self.values.auth.static_relays
}

/// Returns `true` if unknown items should be accepted and forwarded.
pub fn accept_unknown_items(&self) -> bool {
let forward = self.values.routing.unknown_items;
forward.unwrap_or_else(|| !self.processing_enabled())
}
}

impl Default for Config {
Expand Down
33 changes: 18 additions & 15 deletions relay-server/src/actors/envelopes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ impl EnvelopeProcessor {
/// is written back into the item.
fn process_user_reports(&self, state: &mut ProcessEnvelopeState) {
state.envelope.retain_items(|item| {
if item.ty() != ItemType::UserReport {
if item.ty() != &ItemType::UserReport {
return true;
};

Expand Down Expand Up @@ -831,7 +831,7 @@ impl EnvelopeProcessor {
if self.config.processing_enabled() {
state
.envelope
.retain_items(|item| item.ty() != ItemType::ClientReport);
.retain_items(|item| item.ty() != &ItemType::ClientReport);
}
return;
}
Expand All @@ -846,7 +846,7 @@ impl EnvelopeProcessor {
// we're going through all client reports but we're effectively just merging
// them into the first one.
state.envelope.retain_items(|item| {
if item.ty() != ItemType::ClientReport {
if item.ty() != &ItemType::ClientReport {
return true;
};
match ClientReport::parse(&item.payload()) {
Expand Down Expand Up @@ -1038,7 +1038,7 @@ impl EnvelopeProcessor {
fn expand_unreal(&self, state: &mut ProcessEnvelopeState) -> Result<(), ProcessingError> {
let envelope = &mut state.envelope;

if let Some(item) = envelope.take_item_by(|item| item.ty() == ItemType::UnrealReport) {
if let Some(item) = envelope.take_item_by(|item| item.ty() == &ItemType::UnrealReport) {
utils::expand_unreal_envelope(item, envelope, &self.config)?;
}

Expand Down Expand Up @@ -1275,13 +1275,16 @@ impl EnvelopeProcessor {
ItemType::Attachment => false,
ItemType::UserReport => false,

// aggregate data is never considered as part of deduplication
// Aggregate data is never considered as part of deduplication
ItemType::Session => false,
ItemType::Sessions => false,
ItemType::Metrics => false,
ItemType::MetricBuckets => false,
ItemType::ClientReport => false,
ItemType::Profile => false,

// Without knowing more, `Unknown` items are allowed to be repeated
ItemType::Unknown(_) => false,
}
}

Expand All @@ -1299,11 +1302,11 @@ impl EnvelopeProcessor {
// Remove all items first, and then process them. After this function returns, only
// attachments can remain in the envelope. The event will be added again at the end of
// `process_event`.
let event_item = envelope.take_item_by(|item| item.ty() == ItemType::Event);
let transaction_item = envelope.take_item_by(|item| item.ty() == ItemType::Transaction);
let security_item = envelope.take_item_by(|item| item.ty() == ItemType::Security);
let raw_security_item = envelope.take_item_by(|item| item.ty() == ItemType::RawSecurity);
let form_item = envelope.take_item_by(|item| item.ty() == ItemType::FormData);
let event_item = envelope.take_item_by(|item| item.ty() == &ItemType::Event);
let transaction_item = envelope.take_item_by(|item| item.ty() == &ItemType::Transaction);
let security_item = envelope.take_item_by(|item| item.ty() == &ItemType::Security);
let raw_security_item = envelope.take_item_by(|item| item.ty() == &ItemType::RawSecurity);
let form_item = envelope.take_item_by(|item| item.ty() == &ItemType::FormData);
let attachment_item = envelope
.take_item_by(|item| item.attachment_type() == Some(AttachmentType::EventPayload));
let breadcrumbs1 = envelope
Expand All @@ -1313,7 +1316,7 @@ impl EnvelopeProcessor {

// Event items can never occur twice in an envelope.
if let Some(duplicate) = envelope.get_item_by(|item| self.is_duplicate(item)) {
return Err(ProcessingError::DuplicateItem(duplicate.ty()));
return Err(ProcessingError::DuplicateItem(duplicate.ty().clone()));
}

let (event, event_len) = if let Some(mut item) = event_item.or(security_item) {
Expand Down Expand Up @@ -2007,7 +2010,7 @@ impl Handler<ProcessMetrics> for EnvelopeProcessor {

for item in items {
let payload = item.payload();
if item.ty() == ItemType::Metrics {
if item.ty() == &ItemType::Metrics {
let mut timestamp = item.timestamp().unwrap_or(received_timestamp);
clock_drift_processor.process_timestamp(&mut timestamp);

Expand All @@ -2024,7 +2027,7 @@ impl Handler<ProcessMetrics> for EnvelopeProcessor {
relay_log::trace!("inserting metrics into project cache");
project_cache.do_send(InsertMetrics::new(public_key, metrics));
}
} else if item.ty() == ItemType::MetricBuckets {
} else if item.ty() == &ItemType::MetricBuckets {
match Bucket::parse_all(&payload) {
Ok(mut buckets) => {
for bucket in &mut buckets {
Expand Down Expand Up @@ -3037,7 +3040,7 @@ mod tests {
let new_envelope = envelope_response.envelope.unwrap();

assert_eq!(new_envelope.len(), 1);
assert_eq!(new_envelope.items().next().unwrap().ty(), ItemType::Event);
assert_eq!(new_envelope.items().next().unwrap().ty(), &ItemType::Event);
}

#[test]
Expand Down Expand Up @@ -3150,7 +3153,7 @@ mod tests {

let envelope = envelope_response.envelope.unwrap();
let item = envelope.items().next().unwrap();
assert_eq!(item.ty(), ItemType::ClientReport);
assert_eq!(item.ty(), &ItemType::ClientReport);
}

#[test]
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/actors/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ impl Message for StoreEnvelope {
///
/// Slow items must be routed to the `Attachments` topic.
fn is_slow_item(item: &Item) -> bool {
item.ty() == ItemType::Attachment || item.ty() == ItemType::UserReport
item.ty() == &ItemType::Attachment || item.ty() == &ItemType::UserReport
}

impl Handler<StoreEnvelope> for StoreForwarder {
Expand All @@ -712,7 +712,7 @@ impl Handler<StoreEnvelope> for StoreForwarder {

let topic = if envelope.get_item_by(is_slow_item).is_some() {
KafkaTopic::Attachments
} else if event_item.map(|x| x.ty()) == Some(ItemType::Transaction) {
} else if event_item.map(|x| x.ty()) == Some(&ItemType::Transaction) {
KafkaTopic::Transactions
} else {
KafkaTopic::Events
Expand Down
17 changes: 14 additions & 3 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ pub fn event_id_from_formdata(data: &[u8]) -> Result<Option<EventId>, BadStoreRe
/// Extracting the event id from chunked formdata fields on the Minidump endpoint (`sentry__1`,
/// `sentry__2`, ...) is not supported. In this case, `None` is returned.
pub fn event_id_from_items(items: &Items) -> Result<Option<EventId>, BadStoreRequest> {
if let Some(item) = items.iter().find(|item| item.ty() == ItemType::Event) {
if let Some(item) = items.iter().find(|item| item.ty() == &ItemType::Event) {
if let Some(event_id) = event_id_from_json(&item.payload())? {
return Ok(Some(event_id));
}
Expand All @@ -262,7 +262,7 @@ pub fn event_id_from_items(items: &Items) -> Result<Option<EventId>, BadStoreReq
}
}

if let Some(item) = items.iter().find(|item| item.ty() == ItemType::FormData) {
if let Some(item) = items.iter().find(|item| item.ty() == &ItemType::FormData) {
// Swallow all other errors here since it is quite common to receive invalid secondary
// payloads. `EnvelopeProcessor` also retains events in such cases.
if let Ok(Some(event_id)) = event_id_from_formdata(&item.payload()) {
Expand Down Expand Up @@ -346,8 +346,14 @@ where

let future = extract_envelope(&request, meta)
.into_future()
.and_then(clone!(envelope_context, |envelope| {
.and_then(clone!(config, envelope_context, |mut envelope| {
envelope_context.borrow_mut().update(&envelope);

// If configured, remove unknown items at the very beginning. If the envelope is
// empty, we fail the request with a special control flow error to skip checks and
// queueing, that still results in a `200 OK` response.
utils::remove_unknown_items(&config, &mut envelope);

if envelope.is_empty() {
// envelope is empty, cannot send outcomes
Err(BadStoreRequest::EmptyEnvelope)
Expand Down Expand Up @@ -429,6 +435,11 @@ where
return Ok(create_response(event_id));
}

// This is a control-flow error without a bad status code.
if matches!(error, BadStoreRequest::EmptyEnvelope) {
return Ok(create_response(event_id));
jjbayer marked this conversation as resolved.
Show resolved Hide resolved
}

let response = error.error_response();
if response.status().is_server_error() {
relay_log::error!("error handling request: {}", LogError(&error));
Expand Down
Loading