diff --git a/src/sentry/killswitches.py b/src/sentry/killswitches.py index 6cad0657d4ded5..3a5ef067c3e04b 100644 --- a/src/sentry/killswitches.py +++ b/src/sentry/killswitches.py @@ -228,6 +228,17 @@ class KillswitchInfo: "partition_id": "A kafka partition index.", }, ), + "profiling.killswitch.ingest-profiles": KillswitchInfo( + description=""" + Drop profiles in the ingest-profiles consumer. + + This happens after relay produces profiles to the topic but before a task + is started to process/ingest to profile. + """, + fields={ + "project_id": "A project ID.", + }, + ), } diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index c7a28143ae96ae..fea35ea0020b85 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -2762,6 +2762,13 @@ flags=FLAG_AUTOMATOR_MODIFIABLE, ) +register( + "profiling.killswitch.ingest-profiles", + type=Sequence, + default=[], + flags=FLAG_ALLOW_EMPTY | FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE, +) + # max number of profiles to use for computing # the aggregated flamegraph. register( diff --git a/src/sentry/profiles/consumers/process/factory.py b/src/sentry/profiles/consumers/process/factory.py index 53aef80e949615..f8df79d78a5466 100644 --- a/src/sentry/profiles/consumers/process/factory.py +++ b/src/sentry/profiles/consumers/process/factory.py @@ -8,11 +8,15 @@ from arroyo.types import Commit, Message, Partition from sentry import options +from sentry.killswitches import killswitch_matches_context from sentry.processing.backpressure.arroyo import HealthChecker, create_backpressure_step from sentry.profiles.task import process_profile_task def process_message(message: Message[KafkaPayload]) -> None: + if should_drop(message.payload.headers): + return + sampled = is_sampled(message.payload.headers) if sampled or options.get("profiling.profile_metrics.unsampled_profiles.enabled"): @@ -46,3 +50,20 @@ def is_sampled(headers: Iterable[tuple[str, str | bytes]]) -> bool: if isinstance(v, bytes): return v.decode("utf-8") == "true" return True + + +HEADER_KEYS = {"project_id"} + + +def should_drop(headers: Iterable[tuple[str, str | bytes]]) -> bool: + context = {} + for k, v in headers: + if k == "project_id" and isinstance(v, bytes): + context[k] = v.decode("utf-8") + + if "project_id" in context and killswitch_matches_context( + "profiling.killswitch.ingest-profiles", context + ): + return True + + return False diff --git a/tests/sentry/profiles/consumers/test_process.py b/tests/sentry/profiles/consumers/test_process.py index 92ccc9f4856ebf..0250565bb603a6 100644 --- a/tests/sentry/profiles/consumers/test_process.py +++ b/tests/sentry/profiles/consumers/test_process.py @@ -1,62 +1,102 @@ from __future__ import annotations from base64 import b64encode +from collections.abc import MutableSequence from datetime import datetime from typing import Any from unittest.mock import MagicMock, Mock, patch import msgpack +import pytest from arroyo.backends.kafka import KafkaPayload from arroyo.types import BrokerValue, Message, Partition, Topic from django.utils import timezone from sentry.profiles.consumers.process.factory import ProcessProfileStrategyFactory from sentry.profiles.task import _prepare_frames_from_profile -from sentry.testutils.cases import TestCase +from sentry.testutils.helpers.options import override_options +from sentry.testutils.pytest.fixtures import django_db_all from sentry.utils import json -class TestProcessProfileConsumerStrategy(TestCase): - @staticmethod - def processing_factory() -> ProcessProfileStrategyFactory: - return ProcessProfileStrategyFactory() - - @patch("sentry.profiles.consumers.process.factory.process_profile_task.delay") - def test_basic_profile_to_task(self, process_profile_task: MagicMock) -> None: - processing_strategy = self.processing_factory().create_with_partitions( - commit=Mock(), partitions={} +@override_options({"profiling.killswitch.ingest-profiles": [{"project_id": "2"}]}) +@pytest.mark.parametrize("headers", [[], [("project_id", b"1")]]) +@patch("sentry.profiles.consumers.process.factory.process_profile_task.delay") +@django_db_all +def test_basic_profile_to_task( + process_profile_task: MagicMock, headers: MutableSequence[tuple[str, bytes]] +) -> None: + processing_strategy = ProcessProfileStrategyFactory().create_with_partitions( + commit=Mock(), partitions={} + ) + message_dict = { + "organization_id": 1, + "project_id": 1, + "key_id": 1, + "received": int(timezone.now().timestamp()), + "payload": json.dumps({"platform": "android", "profile": ""}), + } + payload = msgpack.packb(message_dict) + + processing_strategy.submit( + Message( + BrokerValue( + KafkaPayload( + b"key", + payload, + headers, + ), + Partition(Topic("profiles"), 1), + 1, + datetime.now(), + ) ) - message_dict = { - "organization_id": 1, - "project_id": 1, - "key_id": 1, - "received": int(timezone.now().timestamp()), - "payload": json.dumps({"platform": "android", "profile": ""}), - } - payload = msgpack.packb(message_dict) - - processing_strategy.submit( - Message( - BrokerValue( - KafkaPayload( - b"key", - payload, - [], - ), - Partition(Topic("profiles"), 1), - 1, - datetime.now(), - ) + ) + processing_strategy.poll() + processing_strategy.join(1) + processing_strategy.terminate() + + process_profile_task.assert_called_with( + payload=b64encode(payload).decode("utf-8"), + sampled=True, + ) + + +@patch("sentry.profiles.consumers.process.factory.process_profile_task.delay") +@override_options({"profiling.killswitch.ingest-profiles": [{"project_id": "1"}]}) +@django_db_all +def test_killswitch_project(process_profile_task: MagicMock) -> None: + processing_strategy = ProcessProfileStrategyFactory().create_with_partitions( + commit=Mock(), partitions={} + ) + message_dict = { + "organization_id": 1, + "project_id": 1, + "key_id": 1, + "received": int(timezone.now().timestamp()), + "payload": json.dumps({"platform": "android", "profile": ""}), + } + payload = msgpack.packb(message_dict) + + processing_strategy.submit( + Message( + BrokerValue( + KafkaPayload( + b"key", + payload, + [("project_id", b"1")], + ), + Partition(Topic("profiles"), 1), + 1, + datetime.now(), ) ) - processing_strategy.poll() - processing_strategy.join(1) - processing_strategy.terminate() + ) + processing_strategy.poll() + processing_strategy.join(1) + processing_strategy.terminate() - process_profile_task.assert_called_with( - payload=b64encode(payload).decode("utf-8"), - sampled=True, - ) + process_profile_task.assert_not_called() def test_adjust_instruction_addr_sample_format() -> None: