Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/sentry/killswitches.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,17 @@ class KillswitchInfo:
"partition_id": "A kafka partition index.",
},
),
"profiling.killswitch.ingest-profiles": KillswitchInfo(
description="""
Drop profiles in the ingest-profiles consumer.

This happens after relay produces profiles to the topic but before a task
is started to process/ingest to profile.
""",
fields={
"project_id": "A project ID.",
},
),
}


Expand Down
7 changes: 7 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -2762,6 +2762,13 @@
flags=FLAG_AUTOMATOR_MODIFIABLE,
)

register(
"profiling.killswitch.ingest-profiles",
type=Sequence,
default=[],
flags=FLAG_ALLOW_EMPTY | FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
)

# max number of profiles to use for computing
# the aggregated flamegraph.
register(
Expand Down
21 changes: 21 additions & 0 deletions src/sentry/profiles/consumers/process/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@
from arroyo.types import Commit, Message, Partition

from sentry import options
from sentry.killswitches import killswitch_matches_context
from sentry.processing.backpressure.arroyo import HealthChecker, create_backpressure_step
from sentry.profiles.task import process_profile_task


def process_message(message: Message[KafkaPayload]) -> None:
if should_drop(message.payload.headers):
return

sampled = is_sampled(message.payload.headers)

if sampled or options.get("profiling.profile_metrics.unsampled_profiles.enabled"):
Expand Down Expand Up @@ -46,3 +50,20 @@ def is_sampled(headers: Iterable[tuple[str, str | bytes]]) -> bool:
if isinstance(v, bytes):
return v.decode("utf-8") == "true"
return True


HEADER_KEYS = {"project_id"}


def should_drop(headers: Iterable[tuple[str, str | bytes]]) -> bool:
context = {}
for k, v in headers:
if k == "project_id" and isinstance(v, bytes):
context[k] = v.decode("utf-8")

if "project_id" in context and killswitch_matches_context(
"profiling.killswitch.ingest-profiles", context
):
return True

return False
116 changes: 78 additions & 38 deletions tests/sentry/profiles/consumers/test_process.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,102 @@
from __future__ import annotations

from base64 import b64encode
from collections.abc import MutableSequence
from datetime import datetime
from typing import Any
from unittest.mock import MagicMock, Mock, patch

import msgpack
import pytest
from arroyo.backends.kafka import KafkaPayload
from arroyo.types import BrokerValue, Message, Partition, Topic
from django.utils import timezone

from sentry.profiles.consumers.process.factory import ProcessProfileStrategyFactory
from sentry.profiles.task import _prepare_frames_from_profile
from sentry.testutils.cases import TestCase
from sentry.testutils.helpers.options import override_options
from sentry.testutils.pytest.fixtures import django_db_all
from sentry.utils import json


class TestProcessProfileConsumerStrategy(TestCase):
@staticmethod
def processing_factory() -> ProcessProfileStrategyFactory:
return ProcessProfileStrategyFactory()

@patch("sentry.profiles.consumers.process.factory.process_profile_task.delay")
def test_basic_profile_to_task(self, process_profile_task: MagicMock) -> None:
processing_strategy = self.processing_factory().create_with_partitions(
commit=Mock(), partitions={}
@override_options({"profiling.killswitch.ingest-profiles": [{"project_id": "2"}]})
@pytest.mark.parametrize("headers", [[], [("project_id", b"1")]])
@patch("sentry.profiles.consumers.process.factory.process_profile_task.delay")
@django_db_all
def test_basic_profile_to_task(
process_profile_task: MagicMock, headers: MutableSequence[tuple[str, bytes]]
) -> None:
processing_strategy = ProcessProfileStrategyFactory().create_with_partitions(
commit=Mock(), partitions={}
)
message_dict = {
"organization_id": 1,
"project_id": 1,
"key_id": 1,
"received": int(timezone.now().timestamp()),
"payload": json.dumps({"platform": "android", "profile": ""}),
}
payload = msgpack.packb(message_dict)

processing_strategy.submit(
Message(
BrokerValue(
KafkaPayload(
b"key",
payload,
headers,
),
Partition(Topic("profiles"), 1),
1,
datetime.now(),
)
)
message_dict = {
"organization_id": 1,
"project_id": 1,
"key_id": 1,
"received": int(timezone.now().timestamp()),
"payload": json.dumps({"platform": "android", "profile": ""}),
}
payload = msgpack.packb(message_dict)

processing_strategy.submit(
Message(
BrokerValue(
KafkaPayload(
b"key",
payload,
[],
),
Partition(Topic("profiles"), 1),
1,
datetime.now(),
)
)
processing_strategy.poll()
processing_strategy.join(1)
processing_strategy.terminate()

process_profile_task.assert_called_with(
payload=b64encode(payload).decode("utf-8"),
sampled=True,
)


@patch("sentry.profiles.consumers.process.factory.process_profile_task.delay")
@override_options({"profiling.killswitch.ingest-profiles": [{"project_id": "1"}]})
@django_db_all
def test_killswitch_project(process_profile_task: MagicMock) -> None:
processing_strategy = ProcessProfileStrategyFactory().create_with_partitions(
commit=Mock(), partitions={}
)
message_dict = {
"organization_id": 1,
"project_id": 1,
"key_id": 1,
"received": int(timezone.now().timestamp()),
"payload": json.dumps({"platform": "android", "profile": ""}),
}
payload = msgpack.packb(message_dict)

processing_strategy.submit(
Message(
BrokerValue(
KafkaPayload(
b"key",
payload,
[("project_id", b"1")],
),
Partition(Topic("profiles"), 1),
1,
datetime.now(),
)
)
processing_strategy.poll()
processing_strategy.join(1)
processing_strategy.terminate()
)
processing_strategy.poll()
processing_strategy.join(1)
processing_strategy.terminate()

process_profile_task.assert_called_with(
payload=b64encode(payload).decode("utf-8"),
sampled=True,
)
process_profile_task.assert_not_called()


def test_adjust_instruction_addr_sample_format() -> None:
Expand Down
Loading