Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 45 additions & 15 deletions relay-server/src/actors/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,27 @@ enum ProcessingError {

type ExtractedEvent = (Annotated<Event>, usize);

/// Determines whether the given item creates an event.
///
/// This is only true for literal events and crash report attachments.
fn is_event_item(item: &Item) -> bool {
match item.ty() {
// These items are direct event types.
ItemType::Event | ItemType::SecurityReport | ItemType::UnrealReport => true,

// Attachments are only event items if they are crash reports.
ItemType::Attachment => match item.attachment_type().unwrap_or_default() {
AttachmentType::AppleCrashReport | AttachmentType::Minidump => true,
_ => false,
},

// Form data items may contain partial event payloads, but those are only ever valid if they
// occur together with an explicit event item, such as a minidump or apple crash report. For
// this reason, FormData alone does not constitute an event item.
ItemType::FormData | ItemType::UserReport => false,
}
}

struct EventProcessor {
config: Arc<Config>,
#[cfg(feature = "processing")]
Expand Down Expand Up @@ -864,13 +885,18 @@ impl Handler<HandleEvent> for EventManager {
} = message;

let event_id = envelope.event_id();
let is_event = envelope
.get_item_by(|item| item.ty() == ItemType::Event)
.is_some();
let project_id = envelope.meta().project_id();
let remote_addr = envelope.meta().client_addr();
let meta_clone = Arc::new(envelope.meta().clone());

// Compute whether this envelope contains an event. This is used in error handling to
// appropriately emit an outecome. Envelopes not containing events (such as standalone
// attachment uploads or user reports) should never create outcomes.
let is_event = envelope.items().any(is_event_item);

// This is used to add the respective organization id to the outcome emitted in the error
// case. The organization id can only be obtained via the project state, which has not been
// loaded at this time.
let org_id_for_err = Rc::new(Mutex::new(None::<u64>));

metric!(set(RelaySets::UniqueProjects) = project_id as i64);
Expand Down Expand Up @@ -979,11 +1005,12 @@ impl Handler<HandleEvent> for EventManager {
.timeout(self.config.event_buffer_expiry(), ProcessingError::Timeout)
.map(|_, _, _| metric!(counter(RelayCounters::EventAccepted) += 1))
.map_err(clone!(project, captured_events, |error, _, _| {
// Do not track outcomes or capture events for non-event envelopes (such as
// individual attachments)
if !is_event {
return;
// Rate limits need special handling: Cache them on the project to avoid
// expensive processing while the limit is active.
if let ProcessingError::RateLimited(ref rate_limit) = error {
project.do_send(rate_limit.clone());
}

// if we are in capture mode, we stash away the event instead of
// forwarding it.
if capture {
Expand All @@ -994,6 +1021,12 @@ impl Handler<HandleEvent> for EventManager {
.insert(event_id, CapturedEvent::Err(msg));
}

// Do not track outcomes or capture events for non-event envelopes (such as
// individual attachments)
if !is_event {
return;
}

metric!(counter(RelayCounters::EventRejected) += 1);
let outcome_params = match error {
// General outcomes for invalid events
Expand All @@ -1019,13 +1052,6 @@ impl Handler<HandleEvent> for EventManager {
Some(Outcome::Invalid(DiscardReason::DuplicateItem))
}

// Rate limits need special handling: Cache them on the project to avoid
// expensive processing while the limit is active.
ProcessingError::RateLimited(ref rate_limit) => {
project.do_send(rate_limit.clone());
Some(Outcome::RateLimited(rate_limit.clone()))
}

// Processing-only outcomes (Sentry-internal Relays)
#[cfg(feature = "processing")]
ProcessingError::InvalidUnrealReport(_) => {
Expand All @@ -1039,6 +1065,10 @@ impl Handler<HandleEvent> for EventManager {
ProcessingError::EventFiltered(ref filter_stat_key) => {
Some(Outcome::Filtered(*filter_stat_key))
}
// Processing-only but not feature flagged
ProcessingError::RateLimited(ref rate_limit) => {
Some(Outcome::RateLimited(rate_limit.clone()))
}

// Internal errors
ProcessingError::SerializeFailed(_)
Expand All @@ -1064,7 +1094,7 @@ impl Handler<HandleEvent> for EventManager {
// we "expect" errors and log them as info level.
log::error!("error processing event {}: {}", event_id, LogError(&error));
} else {
log::info!("dropped event {}: {}", event_id, LogError(&error));
log::debug!("dropped event {}: {}", event_id, LogError(&error));
}

if let Some(outcome) = outcome_params {
Expand Down
22 changes: 22 additions & 0 deletions tests/integration/test_minidump.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,3 +334,25 @@ def test_minidump_with_processing(
"chunks": num_chunks,
}
]


def test_minidump_ratelimit(mini_sentry, relay_with_processing, outcomes_consumer):
relay = relay_with_processing()
relay.wait_relay_healthcheck()

project_config = mini_sentry.project_configs[42] = mini_sentry.full_project_config()
(key_config,) = project_config["publicKeys"]
key_config["quotas"] = [{"limit": 0, "reasonCode": "static_disabled_quota",}]

outcomes_consumer = outcomes_consumer()
attachments = [(MINIDUMP_ATTACHMENT_NAME, "minidump.dmp", "MDMP content")]

# First minidump returns 200 but is rate limited in processing
relay.send_minidump(project_id=42, files=attachments)
outcomes_consumer.assert_rate_limited("static_disabled_quota")

# Second minidump returns 429 in endpoint
with pytest.raises(HTTPError) as excinfo:
relay.send_minidump(project_id=42, files=attachments)
assert excinfo.value.response.status_code == 429
outcomes_consumer.assert_rate_limited("static_disabled_quota")