Skip to content
4 changes: 4 additions & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,10 @@ def SOCIAL_AUTH_DEFAULT_USERNAME():
Queue(
"events.reprocessing.symbolicate_event", routing_key="events.reprocessing.symbolicate_event"
),
Queue(
"events.reprocessing.symbolicate_event_low_priority",
routing_key="events.reprocessing.symbolicate_event_low_priority",
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will need ops to make sure that this queue is handled exclusively by the same workers which handle the events.symbolicate_event_low_priority queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, thanks for the heads up! when you phrase it this way it seems like maybe it's better to just throw reprocessing requests back into the LPQ... i'll start a conversation about this and change this later if it turns out that would be preferable. i assume by having a separate queue for reprocessing requests we could least easily drop everything in that queue without negatively impacting new requests.

Queue("events.save_event", routing_key="events.save_event"),
Queue("events.symbolicate_event", routing_key="events.symbolicate_event"),
Queue(
Expand Down
60 changes: 58 additions & 2 deletions src/sentry/tasks/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,41 @@ def submit_process(
)


def should_demote_symbolication(project_id):
"""
Determines whether a project's symbolication events should be pushed to the low priority queue.
"""
always_lowpri = killswitch_matches_context(
"store.symbolicate-event-lpq-always",
{
"project_id": project_id,
},
)
never_lowpri = killswitch_matches_context(
"store.symbolicate-event-lpq-never",
{
"project_id": project_id,
},
)
return not never_lowpri and always_lowpri


def submit_symbolicate(project, from_reprocessing, cache_key, event_id, start_time, data):
task = symbolicate_event_from_reprocessing if from_reprocessing else symbolicate_event
task.delay(cache_key=cache_key, start_time=start_time, event_id=event_id)


def submit_symbolicate_low_priority(
project, from_reprocessing, cache_key, event_id, start_time, data
):
task = (
symbolicate_event_from_reprocessing_low_priority
if from_reprocessing
else symbolicate_event_low_priority
)
task.delay(cache_key=cache_key, start_time=start_time, event_id=event_id)


def submit_save_event(project, from_reprocessing, cache_key, event_id, start_time, data):
if cache_key:
data = None
Expand Down Expand Up @@ -132,8 +162,16 @@ def _do_preprocess_event(cache_key, data, start_time, event_id, process_task, pr

if should_process_with_symbolicator(data):
reprocessing2.backup_unprocessed_event(project=project, data=original_data)
submit_symbolicate(
project, from_reprocessing, cache_key, event_id, start_time, original_data

is_low_priority = should_demote_symbolication(project_id)
task = submit_symbolicate_low_priority if is_low_priority else submit_symbolicate
task(
project,
from_reprocessing,
cache_key,
event_id,
start_time,
original_data,
)
return

Expand Down Expand Up @@ -386,6 +424,24 @@ def symbolicate_event_from_reprocessing(cache_key, start_time=None, event_id=Non
)


@instrumented_task(
name="sentry.tasks.store.symbolicate_event_from_reprocessing_low_priority",
queue="events.reprocessing.symbolicate_event_low_priority",
time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 30,
soft_time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 20,
acks_late=True,
)
def symbolicate_event_from_reprocessing_low_priority(
cache_key, start_time=None, event_id=None, **kwargs
):
return _do_symbolicate_event(
cache_key=cache_key,
start_time=start_time,
event_id=event_id,
symbolicate_task=symbolicate_event_from_reprocessing_low_priority,
)


@instrumented_task(
name="sentry.tasks.store.retry_process_event",
queue="sleep",
Expand Down
63 changes: 63 additions & 0 deletions tests/sentry/tasks/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
preprocess_event,
process_event,
save_event,
should_demote_symbolication,
symbolicate_event,
time_synthetic_monitoring_event,
)
from sentry.testutils.helpers.options import override_options
from sentry.utils.compat import mock

EVENT_ID = "cc3e6c2bb6b6498097f336d1e6979f4b"
Expand Down Expand Up @@ -61,6 +63,12 @@ def mock_symbolicate_event():
yield m


@pytest.fixture
def mock_symbolicate_event_low_priority():
with mock.patch("sentry.tasks.store.symbolicate_event_low_priority") as m:
yield m


@pytest.fixture
def mock_get_symbolication_function():
with mock.patch("sentry.lang.native.processing.get_symbolication_function") as m:
Expand Down Expand Up @@ -125,6 +133,33 @@ def test_move_to_symbolicate_event(
assert mock_save_event.delay.call_count == 0


@pytest.mark.django_db
def test_move_to_symbolicate_event_low_priority(
default_project,
mock_process_event,
mock_save_event,
mock_symbolicate_event,
mock_symbolicate_event_low_priority,
register_plugin,
):
with override_options({"store.symbolicate-event-lpq-always": [default_project.id]}):
register_plugin(globals(), BasicPreprocessorPlugin)
data = {
"project": default_project.id,
"platform": "native",
"logentry": {"formatted": "test"},
"event_id": EVENT_ID,
"extra": {"foo": "bar"},
}

preprocess_event(data=data)

assert mock_symbolicate_event_low_priority.delay.call_count == 1
assert mock_symbolicate_event.delay.call_count == 0
assert mock_process_event.delay.call_count == 0
assert mock_save_event.delay.call_count == 0


@pytest.mark.django_db
def test_symbolicate_event_call_process_inline(
default_project,
Expand Down Expand Up @@ -409,3 +444,31 @@ def test_time_synthetic_monitoring_event_in_save_event(mock_metrics_timing):
mock.ANY,
)
assert to_process.kwargs == {"tags": tags, "sample_rate": 1.0}


@pytest.mark.django_db
def test_should_demote_symbolication_empty(default_project):
assert not should_demote_symbolication(default_project.id)


@pytest.mark.django_db
def test_should_demote_symbolication_always(default_project):
with override_options({"store.symbolicate-event-lpq-always": [default_project.id]}):
assert should_demote_symbolication(default_project.id)


@pytest.mark.django_db
def test_should_demote_symbolication_never(default_project):
with override_options({"store.symbolicate-event-lpq-never": [default_project.id]}):
assert not should_demote_symbolication(default_project.id)


@pytest.mark.django_db
def test_should_demote_symbolication_always_and_never(default_project):
with override_options(
{
"store.symbolicate-event-lpq-never": [default_project.id],
"store.symbolicate-event-lpq-always": [default_project.id],
}
):
assert not should_demote_symbolication(default_project.id)