diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ada767ecda3..4e8cf0ebae7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -508,7 +508,7 @@ jobs: test_integration: name: Integration Tests runs-on: ubuntu-latest - timeout-minutes: 20 + timeout-minutes: 30 # Skip redundant checks for library releases if: "!startsWith(github.ref, 'refs/heads/release-library/')" diff --git a/CHANGELOG.md b/CHANGELOG.md index aef9b78171a..6e9333d66c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,7 +30,6 @@ - Use custom wildcard matching instead of regular expressions. ([#4073](https://github.com/getsentry/relay/pull/4073)) - Allowlist the SentryUptimeBot user-agent. ([#4068](https://github.com/getsentry/relay/pull/4068)) - Feature flags of graduated features are now hard-coded in Relay so they can be removed from Sentry. ([#4076](https://github.com/getsentry/relay/pull/4076), [#4080](https://github.com/getsentry/relay/pull/4080)) -- Prevent span extraction when quota is active to reduce load on redis. ([#4097](https://github.com/getsentry/relay/pull/4097)) ## 24.9.0 diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index b53df37f6b7..afc9b3fb787 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -53,7 +53,6 @@ use smallvec::SmallVec; use crate::constants::DEFAULT_EVENT_RETENTION; use crate::extractors::{PartialMeta, RequestMeta}; -use crate::utils::SeqCount; pub const CONTENT_TYPE: &str = "application/x-sentry-envelope"; @@ -872,27 +871,6 @@ impl Item { self.headers.other.insert(name.into(), value.into()) } - /// Counts how many spans are contained in a transaction payload. - /// - /// The transaction itself represents a span as well, so this function returns - /// `len(event.spans) + 1`. - /// - /// Returns zero if - /// - the item is not a transaction, - /// - the spans have already been extracted (in which case they are represented elsewhere). - pub fn count_nested_spans(&self) -> usize { - #[derive(Debug, Deserialize)] - struct PartialEvent { - spans: SeqCount, - } - - if self.ty() != &ItemType::Transaction || self.spans_extracted() { - return 0; - } - - serde_json::from_slice::(&self.payload()).map_or(0, |event| event.spans.0 + 1) - } - /// Determines whether the given item creates an event. /// /// This is only true for literal events and crash report attachments. diff --git a/relay-server/src/metrics_extraction/event.rs b/relay-server/src/metrics_extraction/event.rs index 98b76fd22b4..09a06fa6a2b 100644 --- a/relay-server/src/metrics_extraction/event.rs +++ b/relay-server/src/metrics_extraction/event.rs @@ -49,12 +49,14 @@ impl Extractable for Span { /// If this is a transaction event with spans, metrics will also be extracted from the spans. pub fn extract_metrics( event: &mut Event, + spans_extracted: bool, config: CombinedMetricExtractionConfig<'_>, max_tag_value_size: usize, span_extraction_sample_rate: Option, ) -> Vec { let mut metrics = generic::extract_metrics(event, config); - if sample(span_extraction_sample_rate.unwrap_or(1.0)) { + // If spans were already extracted for an event, we rely on span processing to extract metrics. + if !spans_extracted && sample(span_extraction_sample_rate.unwrap_or(1.0)) { extract_span_metrics_for_event(event, config, max_tag_value_size, &mut metrics); } @@ -1200,6 +1202,7 @@ mod tests { extract_metrics( event.value_mut().as_mut().unwrap(), + false, combined_config(features, None).combined(), 200, None, @@ -1410,6 +1413,7 @@ mod tests { let metrics = extract_metrics( event.value_mut().as_mut().unwrap(), + false, combined_config([Feature::ExtractCommonSpanMetricsFromEvent], None).combined(), 200, None, @@ -1466,6 +1470,7 @@ mod tests { let metrics = extract_metrics( event.value_mut().as_mut().unwrap(), + false, combined_config([Feature::ExtractCommonSpanMetricsFromEvent], None).combined(), 200, None, @@ -1497,6 +1502,7 @@ mod tests { let metrics = extract_metrics( event.value_mut().as_mut().unwrap(), + false, combined_config([Feature::ExtractCommonSpanMetricsFromEvent], None).combined(), 200, None, @@ -1759,6 +1765,7 @@ mod tests { let metrics = extract_metrics( event.value_mut().as_mut().unwrap(), + false, combined_config([Feature::ExtractCommonSpanMetricsFromEvent], None).combined(), 200, None, @@ -1899,7 +1906,13 @@ mod tests { ); let config = binding.combined(); - let _ = extract_metrics(event.value_mut().as_mut().unwrap(), config, 200, None); + let _ = extract_metrics( + event.value_mut().as_mut().unwrap(), + false, + config, + 200, + None, + ); insta::assert_debug_snapshot!(&event.value().unwrap()._metrics_summary); insta::assert_debug_snapshot!( diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index d04a8e33105..9b09ffa6d0c 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1431,6 +1431,7 @@ impl EnvelopeProcessorService { let metrics = crate::metrics_extraction::event::extract_metrics( event, + state.spans_extracted, combined_config, self.inner .config diff --git a/relay-server/src/services/project.rs b/relay-server/src/services/project.rs index b198d686b32..778eca9f897 100644 --- a/relay-server/src/services/project.rs +++ b/relay-server/src/services/project.rs @@ -12,6 +12,7 @@ use relay_system::{Addr, BroadcastChannel}; use serde::{Deserialize, Serialize}; use tokio::time::Instant; +use crate::envelope::ItemType; use crate::services::metrics::{Aggregator, MergeBuckets}; use crate::services::outcome::{DiscardReason, Outcome}; use crate::services::processor::{EncodeMetricMeta, EnvelopeProcessor, ProcessProjectMetrics}; @@ -19,6 +20,7 @@ use crate::services::project::state::ExpiryState; use crate::services::project_cache::{ CheckedEnvelope, ProcessMetrics, ProjectCache, RequestUpdate, }; +use crate::utils::{Enforcement, SeqCount}; use crate::statsd::RelayCounters; use crate::utils::{EnvelopeLimiter, ManagedEnvelope, RetryBackoff}; @@ -555,9 +557,19 @@ impl Project { Ok(current_limits.check_with_quotas(quotas, item_scoping)) }); - let (enforcement, mut rate_limits) = + let (mut enforcement, mut rate_limits) = envelope_limiter.compute(envelope.envelope_mut(), &scoping)?; + let check_nested_spans = state + .as_ref() + .is_some_and(|s| s.has_feature(Feature::ExtractSpansFromEvent)); + + // If we can extract spans from the event, we want to try and count the number of nested + // spans to correctly emit negative outcomes in case the transaction itself is dropped. + if check_nested_spans { + sync_spans_to_enforcement(&envelope, &mut enforcement); + } + enforcement.apply_with_outcomes(&mut envelope); envelope.update(); @@ -586,9 +598,52 @@ impl Project { } } +/// Adds category limits for the nested spans inside a transaction. +/// +/// On the fast path of rate limiting, we do not have nested spans of a transaction extracted +/// as top-level spans, thus if we limited a transaction, we want to count and emit negative +/// outcomes for each of the spans nested inside that transaction. +fn sync_spans_to_enforcement(envelope: &ManagedEnvelope, enforcement: &mut Enforcement) { + if !enforcement.is_event_active() { + return; + } + + let spans_count = count_nested_spans(envelope); + if spans_count == 0 { + return; + } + + if enforcement.event.is_active() { + enforcement.spans = enforcement.event.clone_for(DataCategory::Span, spans_count); + } + + if enforcement.event_indexed.is_active() { + enforcement.spans_indexed = enforcement + .event_indexed + .clone_for(DataCategory::SpanIndexed, spans_count); + } +} + +/// Counts the nested spans inside the first transaction envelope item inside the [`Envelope`](crate::envelope::Envelope). +fn count_nested_spans(envelope: &ManagedEnvelope) -> usize { + #[derive(Debug, Deserialize)] + struct PartialEvent { + spans: SeqCount, + } + + envelope + .envelope() + .items() + .find(|item| *item.ty() == ItemType::Transaction && !item.spans_extracted()) + .and_then(|item| serde_json::from_slice::(&item.payload()).ok()) + // We do + 1, since we count the transaction itself because it will be extracted + // as a span and counted during the slow path of rate limiting. + .map_or(0, |event| event.spans.0 + 1) +} + #[cfg(test)] mod tests { - use crate::envelope::{ContentType, Envelope, Item, ItemType}; + use crate::envelope::{ContentType, Envelope, Item}; use crate::extractors::RequestMeta; use crate::services::processor::ProcessingGroup; use relay_base_schema::project::ProjectId; @@ -720,7 +775,27 @@ mod tests { RequestMeta::new(dsn) } - const EVENT_WITH_SPANS: &str = r#"{ + #[test] + fn test_track_nested_spans_outcomes() { + let mut project = create_project(Some(json!({ + "features": [ + "organizations:indexed-spans-extraction" + ], + "quotas": [{ + "id": "foo", + "categories": ["transaction"], + "window": 3600, + "limit": 0, + "reasonCode": "foo", + }] + }))); + + let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta()); + + let mut transaction = Item::new(ItemType::Transaction); + transaction.set_payload( + ContentType::Json, + r#"{ "event_id": "52df9022835246eeb317dbd739ccd059", "type": "transaction", "transaction": "I have a stale timestamp, but I'm recent!", @@ -746,27 +821,8 @@ mod tests { "trace_id": "ff62a8b040f340bda5d830223def1d81" } ] -}"#; - - #[test] - fn test_track_nested_spans_outcomes() { - let mut project = create_project(Some(json!({ - "features": [ - "organizations:indexed-spans-extraction" - ], - "quotas": [{ - "id": "foo", - "categories": ["transaction"], - "window": 3600, - "limit": 0, - "reasonCode": "foo", - }] - }))); - - let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta()); - - let mut transaction = Item::new(ItemType::Transaction); - transaction.set_payload(ContentType::Json, EVENT_WITH_SPANS); +}"#, + ); envelope.add_item(transaction); @@ -796,59 +852,4 @@ mod tests { assert_eq!(outcome.quantity, expected_quantity); } } - - #[test] - fn test_track_nested_spans_outcomes_span_quota() { - let mut project = create_project(Some(json!({ - "features": [ - "organizations:indexed-spans-extraction" - ], - "quotas": [{ - "id": "foo", - "categories": ["span_indexed"], - "window": 3600, - "limit": 0, - "reasonCode": "foo", - }] - }))); - - let mut envelope = Envelope::from_request(Some(EventId::new()), request_meta()); - - let mut transaction = Item::new(ItemType::Transaction); - transaction.set_payload(ContentType::Json, EVENT_WITH_SPANS); - - envelope.add_item(transaction); - - let (outcome_aggregator, mut outcome_aggregator_rx) = Addr::custom(); - let (test_store, _) = Addr::custom(); - - let managed_envelope = ManagedEnvelope::new( - envelope, - outcome_aggregator.clone(), - test_store, - ProcessingGroup::Transaction, - ); - - let CheckedEnvelope { - envelope, - rate_limits: _, - } = project.check_envelope(managed_envelope).unwrap(); - let envelope = envelope.unwrap(); - let transaction_item = envelope - .envelope() - .items() - .find(|i| *i.ty() == ItemType::Transaction) - .unwrap(); - assert!(transaction_item.spans_extracted()); - - drop(outcome_aggregator); - - let expected = [(DataCategory::SpanIndexed, 3)]; - - for (expected_category, expected_quantity) in expected { - let outcome = outcome_aggregator_rx.blocking_recv().unwrap(); - assert_eq!(outcome.category, expected_category); - assert_eq!(outcome.quantity, expected_quantity); - } - } } diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index db18c0b2341..4adaf92c53a 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -217,9 +217,6 @@ impl EnvelopeSummary { summary.profile_quantity += source_quantities.profiles; } - // Also count nested spans: - summary.span_quantity += item.count_nested_spans(); - summary.payload_size += item.len(); summary.set_quantity(item); } @@ -451,7 +448,6 @@ impl Enforcement { envelope .envelope_mut() .retain_items(|item| self.retain_item(item)); - self.track_outcomes(envelope); } @@ -462,12 +458,6 @@ impl Enforcement { return false; } - if item.ty() == &ItemType::Transaction && self.spans_indexed.is_active() { - // We cannot remove nested spans from the transaction, but we can prevent them - // from being extracted into standalone spans. - item.set_spans_extracted(true); - } - // When checking limits for categories that have an indexed variant, // we only have to check the more specific, the indexed, variant // to determine whether an item is limited. diff --git a/tests/integration/test_metrics.py b/tests/integration/test_metrics.py index 91e87306083..ef157ea2a57 100644 --- a/tests/integration/test_metrics.py +++ b/tests/integration/test_metrics.py @@ -1206,21 +1206,19 @@ def test_no_transaction_metrics_when_filtered(mini_sentry, relay): relay = relay(mini_sentry, options=TEST_CONFIG) relay.send_transaction(project_id, tx) - # The only envelopes received should be outcomes for {Span,Transaction}[Indexed]?: - reports = [mini_sentry.get_client_report() for _ in range(4)] + # The only envelopes received should be outcomes for Transaction{,Indexed}: + reports = [mini_sentry.get_client_report() for _ in range(2)] filtered_events = [ outcome for report in reports for outcome in report["filtered_events"] ] filtered_events.sort(key=lambda x: x["category"]) assert filtered_events == [ - {"reason": "release-version", "category": "span", "quantity": 2}, - {"reason": "release-version", "category": "span_indexed", "quantity": 2}, {"reason": "release-version", "category": "transaction", "quantity": 1}, {"reason": "release-version", "category": "transaction_indexed", "quantity": 1}, ] - assert mini_sentry.captured_events.empty + assert mini_sentry.captured_events.empty() def test_transaction_name_too_long( diff --git a/tests/integration/test_outcome.py b/tests/integration/test_outcome.py index 99c143a376e..f1f15910f22 100644 --- a/tests/integration/test_outcome.py +++ b/tests/integration/test_outcome.py @@ -254,7 +254,7 @@ def _send_event(relay, project_id=42, event_type="error", event_id=None, trace_i return event_id -@pytest.mark.parametrize("event_type", ["transaction"]) +@pytest.mark.parametrize("event_type", ["error", "transaction"]) def test_outcomes_non_processing(relay, mini_sentry, event_type): """ Test basic outcome functionality. @@ -272,12 +272,10 @@ def test_outcomes_non_processing(relay, mini_sentry, event_type): [ DataCategory.TRANSACTION, DataCategory.TRANSACTION_INDEXED, - DataCategory.SPAN, - DataCategory.SPAN_INDEXED, ] if event_type == "transaction" - else [1] - ) # Error + else [DataCategory.ERROR] + ) outcomes = [] for _ in expected_categories: @@ -481,7 +479,7 @@ def test_outcome_forwarding( _send_event(downstream_relay, event_type=event_type) - expected_categories = [1] if event_type == "error" else [2, 9, 12, 16] + expected_categories = [1] if event_type == "error" else [2, 9] outcomes = outcomes_consumer.get_outcomes(n=len(expected_categories)) outcomes.sort(key=lambda x: x["category"]) @@ -754,7 +752,7 @@ def _get_span_payload(): "category,outcome_categories", [ ("session", []), - ("transaction", ["transaction", "transaction_indexed", "span", "span_indexed"]), + ("transaction", ["transaction", "transaction_indexed"]), ("user_report_v2", ["user_report_v2"]), ], ) @@ -771,10 +769,10 @@ def test_outcomes_rate_limit( relay = relay_with_processing(config) project_id = 42 project_config = mini_sentry.add_full_project_config(project_id) - reason_code = "banned" + reason_code = "transactions are banned" project_config["config"]["quotas"] = [ { - "id": "some_id", + "id": "transaction category", "categories": [category], "limit": 0, "window": 1600, @@ -1644,19 +1642,17 @@ def test_profile_outcomes_rate_limited( outcomes = outcomes_consumer.get_outcomes() outcomes.sort(key=lambda o: sorted(o.items())) - expected_categories_and_quantities = [ - (DataCategory.PROFILE, 1), - (DataCategory.PROFILE_INDEXED, 1), + expected_categories = [ + DataCategory.PROFILE, + DataCategory.PROFILE_INDEXED, ] # Profile, ProfileIndexed if quota_category == "transaction": # Transaction got rate limited as well: - expected_categories_and_quantities += [ - (DataCategory.TRANSACTION, 1), - (DataCategory.TRANSACTION_INDEXED, 1), - (DataCategory.SPAN, 2), - (DataCategory.SPAN_INDEXED, 2), + expected_categories += [ + DataCategory.TRANSACTION, + DataCategory.TRANSACTION_INDEXED, ] # Transaction, TransactionIndexed - expected_categories_and_quantities.sort() + expected_categories.sort() expected_outcomes = [ { @@ -1665,11 +1661,11 @@ def test_profile_outcomes_rate_limited( "org_id": 1, "outcome": 2, # RateLimited "project_id": 42, - "quantity": quantity, + "quantity": 1, "reason": "profiles_exceeded", "timestamp": time_within_delta(), } - for (category, quantity) in expected_categories_and_quantities + for category in expected_categories ] assert outcomes == expected_outcomes, outcomes @@ -1930,6 +1926,14 @@ def test_span_outcomes_invalid( # Create an envelope with an invalid profile: def make_envelope(): envelope = Envelope() + payload = _get_event_payload("transaction") + payload["spans"][0].pop("span_id", None) + envelope.add_item( + Item( + payload=PayloadRef(bytes=json.dumps(payload).encode()), + type="transaction", + ) + ) payload = _get_span_payload() payload.pop("span_id", None) envelope.add_item( @@ -1943,7 +1947,7 @@ def make_envelope(): envelope = make_envelope() upstream.send_envelope(project_id, envelope) - outcomes = outcomes_consumer.get_outcomes(timeout=10.0, n=2) + outcomes = outcomes_consumer.get_outcomes(timeout=10.0, n=4) outcomes.sort(key=lambda o: sorted(o.items())) assert outcomes == [ @@ -1954,13 +1958,15 @@ def make_envelope(): "outcome": 3, # Invalid "project_id": 42, "quantity": 1, - "reason": "invalid_span", + "reason": reason, "source": "pop-relay", "timestamp": time_within_delta(), } - for category in [ - DataCategory.SPAN, - DataCategory.SPAN_INDEXED, + for (category, reason) in [ + (DataCategory.TRANSACTION, "invalid_transaction"), + (DataCategory.TRANSACTION_INDEXED, "invalid_transaction"), + (DataCategory.SPAN, "invalid_span"), + (DataCategory.SPAN_INDEXED, "invalid_span"), ] ] diff --git a/tests/integration/test_spans.py b/tests/integration/test_spans.py index fe46a93ed77..bd883b5ed88 100644 --- a/tests/integration/test_spans.py +++ b/tests/integration/test_spans.py @@ -8,12 +8,10 @@ from requests import HTTPError from sentry_sdk.envelope import Envelope, Item, PayloadRef - from .consts import ( METRICS_EXTRACTION_MIN_SUPPORTED_VERSION, TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, ) -from .test_envelope import generate_transaction_item from .test_metrics import TEST_CONFIG from .test_store import make_transaction @@ -1268,84 +1266,6 @@ def summarize_outcomes(): assert usage_metrics() == (1, 2) -def test_rate_limit_spans_without_redis( - mini_sentry, - relay, -): - """Rate limits for total spans are enforced and no metrics are emitted.""" - relay = relay(mini_sentry, TEST_CONFIG) - project_id = 42 - project_config = mini_sentry.add_full_project_config(project_id) - project_config["config"]["features"] = [ - "projects:span-metrics-extraction", - ] - project_config["config"]["transactionMetrics"] = { - "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION - } - - project_config["config"]["quotas"] = [ - { - "categories": ["span_indexed"], - "limit": 0, - "window": int(datetime.now(UTC).timestamp()), - "id": uuid.uuid4(), - "reasonCode": "foo", - }, - ] - project_config["config"]["sampling"] = ( - { # Drop everything, to trigger metrics extraction - "version": 2, - "rules": [ - { - "id": 1, - "samplingValue": {"type": "sampleRate", "value": 0.0}, - "type": "transaction", - "condition": {"op": "and", "inner": []}, - } - ], - } - ) - - # Send an error event to populate the project cache: - relay.send_event(project_id) - envelope = mini_sentry.captured_events.get() - assert [item.type for item in envelope.items] == ["event"] - - with pytest.raises(HTTPError) as e: - relay.send_transaction(project_id, generate_transaction_item()) - assert ( - e.value.response.headers["x-sentry-rate-limits"] - == "60:span_indexed:organization:foo" - ) - - # Spans were rate limited - client_report = mini_sentry.get_client_report() - del client_report["timestamp"] - assert client_report == { - "discarded_events": [], - "rate_limited_events": [ - {"reason": "foo", "category": "span_indexed", "quantity": 2} - ], - } - - # Transaction was dynamically sampled - client_report = mini_sentry.get_client_report() - del client_report["timestamp"] - assert client_report == { - "discarded_events": [], - "filtered_sampling_events": [ - {"reason": "Sampled:0", "category": "transaction_indexed", "quantity": 1} - ], - } - - # Metrics were received regardless - metrics = mini_sentry.get_metrics() - assert any(metric["name"] == "c:spans/usage@none" for metric in metrics) - - # Nothing else was sent upstream - assert mini_sentry.captured_events.empty() - - @pytest.mark.parametrize( "tags, expected_tags", [