diff --git a/mypy.ini b/mypy.ini index e56638ab4810dc..8a50427748348c 100644 --- a/mypy.ini +++ b/mypy.ini @@ -57,6 +57,8 @@ files = src/sentry/api/bases/external_actor.py, src/sentry/spans/**/*.py, src/sentry/tasks/app_store_connect.py, src/sentry/tasks/low_priority_symbolication.py, + src/sentry/tasks/store.py, + src/sentry/tasks/symbolication.py, src/sentry/tasks/update_user_reports.py, src/sentry/unmerge.py, src/sentry/utils/appleconnect/, diff --git a/src/sentry/tasks/store.py b/src/sentry/tasks/store.py index 690fe22c646ae7..5e346df86f5777 100644 --- a/src/sentry/tasks/store.py +++ b/src/sentry/tasks/store.py @@ -1,6 +1,7 @@ import logging from datetime import datetime from time import time +from typing import Any, Callable, Dict, List, Optional import sentry_sdk from django.conf import settings @@ -12,6 +13,7 @@ from sentry.constants import DEFAULT_STORE_NORMALIZER_ARGS from sentry.datascrubbing import scrub_data from sentry.eventstore import processing +from sentry.eventstore.processing.base import Event from sentry.killswitches import killswitch_matches_context from sentry.models import Activity, Organization, Project, ProjectOption from sentry.stacktraces.processing import process_stacktraces, should_process_for_stacktraces @@ -33,9 +35,13 @@ class RetryProcessing(Exception): pass -@metrics.wraps("should_process") -def should_process(data): +@metrics.wraps("should_process") # type: ignore +def should_process(data: CanonicalKeyDict) -> bool: """Quick check if processing is needed at all.""" + return _should_process_inner(data) + + +def _should_process_inner(data: CanonicalKeyDict) -> bool: from sentry.plugins.base import plugins if data.get("type") == "transaction": @@ -59,13 +65,13 @@ def should_process(data): def submit_process( - project, - from_reprocessing, - cache_key, - event_id, - start_time, - data_has_changed=None, -): + project: Optional[Project], + from_reprocessing: bool, + cache_key: str, + event_id: Optional[str], + start_time: Optional[int], + data_has_changed: bool = False, +) -> None: task = process_event_from_reprocessing if from_reprocessing else process_event task.delay( cache_key=cache_key, @@ -75,7 +81,14 @@ def submit_process( ) -def submit_save_event(project_id, from_reprocessing, cache_key, event_id, start_time, data): +def submit_save_event( + project_id: int, + from_reprocessing: bool, + cache_key: Optional[str], + event_id: Optional[str], + start_time: Optional[int], + data: Optional[Event], +) -> None: if cache_key: data = None @@ -90,7 +103,14 @@ def submit_save_event(project_id, from_reprocessing, cache_key, event_id, start_ ) -def _do_preprocess_event(cache_key, data, start_time, event_id, process_task, project): +def _do_preprocess_event( + cache_key: str, + data: Optional[Event], + start_time: Optional[int], + event_id: Optional[str], + process_task: Callable[[Optional[str], Optional[int], Optional[str], bool], None], + project: Optional[Project], +) -> None: from sentry.lang.native.processing import should_process_with_symbolicator from sentry.tasks.symbolication import should_demote_symbolication, submit_symbolicate @@ -147,15 +167,20 @@ def _do_preprocess_event(cache_key, data, start_time, event_id, process_task, pr submit_save_event(project_id, from_reprocessing, cache_key, event_id, start_time, original_data) -@instrumented_task( +@instrumented_task( # type: ignore name="sentry.tasks.store.preprocess_event", queue="events.preprocess_event", time_limit=65, soft_time_limit=60, ) def preprocess_event( - cache_key=None, data=None, start_time=None, event_id=None, project=None, **kwargs -): + cache_key: str, + data: Optional[Event] = None, + start_time: Optional[int] = None, + event_id: Optional[str] = None, + project: Optional[Project] = None, + **kwargs: Any, +) -> None: return _do_preprocess_event( cache_key=cache_key, data=data, @@ -166,15 +191,20 @@ def preprocess_event( ) -@instrumented_task( +@instrumented_task( # type: ignore name="sentry.tasks.store.preprocess_event_from_reprocessing", queue="events.reprocessing.preprocess_event", time_limit=65, soft_time_limit=60, ) def preprocess_event_from_reprocessing( - cache_key=None, data=None, start_time=None, event_id=None, project=None, **kwargs -): + cache_key: str, + data: Optional[Event] = None, + start_time: Optional[int] = None, + event_id: Optional[str] = None, + project: Optional[Project] = None, + **kwargs: Any, +) -> None: return _do_preprocess_event( cache_key=cache_key, data=data, @@ -185,13 +215,13 @@ def preprocess_event_from_reprocessing( ) -@instrumented_task( +@instrumented_task( # type: ignore name="sentry.tasks.store.retry_process_event", queue="sleep", time_limit=(60 * 5) + 5, soft_time_limit=60 * 5, ) -def retry_process_event(process_task_name, task_kwargs, **kwargs): +def retry_process_event(process_task_name: str, task_kwargs: Dict[str, Any], **kwargs: Any) -> None: """ The only purpose of this task is be enqueued with some ETA set. This is essentially an implementation of ETAs on top of Celery's existing ETAs, but @@ -210,14 +240,14 @@ def retry_process_event(process_task_name, task_kwargs, **kwargs): def do_process_event( - cache_key, - start_time, - event_id, - process_task, - data=None, - data_has_changed=None, - from_symbolicate=False, -): + cache_key: str, + start_time: Optional[int], + event_id: Optional[str], + process_task: Callable[[Optional[str], Optional[int], Optional[str], bool], None], + data: Optional[Event] = None, + data_has_changed: bool = False, + from_symbolicate: bool = False, +) -> None: from sentry.plugins.base import plugins if data is None: @@ -237,7 +267,7 @@ def do_process_event( event_id = data["event_id"] - def _continue_to_save_event(): + def _continue_to_save_event() -> None: from_reprocessing = process_task is process_event_from_reprocessing submit_save_event(project_id, from_reprocessing, cache_key, event_id, start_time, data) @@ -259,7 +289,7 @@ def _continue_to_save_event(): "organization", Organization.objects.get_from_cache(id=project.organization_id) ) - has_changed = bool(data_has_changed) + has_changed = data_has_changed with sentry_sdk.start_span(op="tasks.store.process_event.get_reprocessing_revision"): # Fetch the reprocessing revision @@ -376,13 +406,19 @@ def _continue_to_save_event(): return _continue_to_save_event() -@instrumented_task( +@instrumented_task( # type: ignore name="sentry.tasks.store.process_event", queue="events.process_event", time_limit=65, soft_time_limit=60, ) -def process_event(cache_key, start_time=None, event_id=None, data_has_changed=None, **kwargs): +def process_event( + cache_key: str, + start_time: Optional[int] = None, + event_id: Optional[str] = None, + data_has_changed: bool = False, + **kwargs: Any, +) -> None: """ Handles event processing (for those events that need it) @@ -402,15 +438,19 @@ def process_event(cache_key, start_time=None, event_id=None, data_has_changed=No ) -@instrumented_task( +@instrumented_task( # type: ignore name="sentry.tasks.store.process_event_from_reprocessing", queue="events.reprocessing.process_event", time_limit=65, soft_time_limit=60, ) def process_event_from_reprocessing( - cache_key, start_time=None, event_id=None, data_has_changed=None, **kwargs -): + cache_key: str, + start_time: Optional[int] = None, + event_id: Optional[str] = None, + data_has_changed: bool = False, + **kwargs: Any, +) -> None: return do_process_event( cache_key=cache_key, start_time=start_time, @@ -420,7 +460,9 @@ def process_event_from_reprocessing( ) -def delete_raw_event(project_id, event_id, allow_hint_clear=False): +def delete_raw_event( + project_id: int, event_id: Optional[str], allow_hint_clear: bool = False +) -> None: set_current_event_project(project_id) if event_id is None: @@ -448,8 +490,14 @@ def delete_raw_event(project_id, event_id, allow_hint_clear=False): def create_failed_event( - cache_key, data, project_id, issues, event_id, start_time=None, reprocessing_rev=None -): + cache_key: str, + data: Optional[Event], + project_id: int, + issues: List[Dict[str, str]], + event_id: Optional[str], + start_time: Optional[int] = None, + reprocessing_rev: Any = None, +) -> bool: """If processing failed we put the original data from the cache into a raw event. Returns `True` if a failed event was inserted """ @@ -538,8 +586,13 @@ def create_failed_event( def _do_save_event( - cache_key=None, data=None, start_time=None, event_id=None, project_id=None, **kwargs -): + cache_key: Optional[str] = None, + data: Optional[Event] = None, + start_time: Optional[int] = None, + event_id: Optional[str] = None, + project_id: Optional[int] = None, + **kwargs: Any, +) -> None: """ Saves an event to the database. """ @@ -643,7 +696,9 @@ def _do_save_event( time_synthetic_monitoring_event(data, project_id, start_time) -def time_synthetic_monitoring_event(data, project_id, start_time): +def time_synthetic_monitoring_event( + data: Event, project_id: int, start_time: Optional[int] +) -> bool: """ For special events produced by the recurring synthetic monitoring functions, emit timing metrics for: @@ -691,13 +746,18 @@ def time_synthetic_monitoring_event(data, project_id, start_time): return True -@instrumented_task( +@instrumented_task( # type: ignore name="sentry.tasks.store.save_event", queue="events.save_event", time_limit=65, soft_time_limit=60, ) def save_event( - cache_key=None, data=None, start_time=None, event_id=None, project_id=None, **kwargs -): + cache_key: Optional[str] = None, + data: Optional[Event] = None, + start_time: Optional[int] = None, + event_id: Optional[str] = None, + project_id: Optional[int] = None, + **kwargs: Any, +) -> None: _do_save_event(cache_key, data, start_time, event_id, project_id, **kwargs) diff --git a/src/sentry/tasks/symbolication.py b/src/sentry/tasks/symbolication.py index d3733e3d9b9120..fdbe93a17446bc 100644 --- a/src/sentry/tasks/symbolication.py +++ b/src/sentry/tasks/symbolication.py @@ -1,12 +1,14 @@ import logging import random from time import sleep, time +from typing import Any, Callable, Optional import sentry_sdk from django.conf import settings from sentry import options from sentry.eventstore import processing +from sentry.eventstore.processing.base import Event from sentry.killswitches import killswitch_matches_context from sentry.processing import realtime_metrics from sentry.tasks import store @@ -21,7 +23,7 @@ # Is reprocessing on or off by default? REPROCESSING_DEFAULT = False -SYMBOLICATOR_MAX_RETRY_AFTER = settings.SYMBOLICATOR_MAX_RETRY_AFTER +SYMBOLICATOR_MAX_RETRY_AFTER: int = settings.SYMBOLICATOR_MAX_RETRY_AFTER # The maximum number of times an event will be moved between the normal # and low priority queues @@ -38,7 +40,7 @@ class RetrySymbolication(Exception): - def __init__(self, retry_after=None): + def __init__(self, retry_after: Optional[int] = None) -> None: self.retry_after = retry_after @@ -77,8 +79,14 @@ def should_demote_symbolication(project_id: int) -> bool: def submit_symbolicate( - is_low_priority, from_reprocessing, cache_key, event_id, start_time, data, queue_switches=0 -): + is_low_priority: bool, + from_reprocessing: bool, + cache_key: str, + event_id: Optional[str], + start_time: Optional[int], + data: Optional[Event], + queue_switches: int = 0, +) -> None: if is_low_priority: task = ( symbolicate_event_from_reprocessing_low_priority @@ -98,8 +106,13 @@ def submit_symbolicate( def _do_symbolicate_event( - cache_key, start_time, event_id, symbolicate_task, data=None, queue_switches=0 -): + cache_key: str, + start_time: Optional[int], + event_id: Optional[str], + symbolicate_task: Callable[[Optional[str], Optional[int], Optional[str]], None], + data: Optional[Event] = None, + queue_switches: int = 0, +) -> None: from sentry.lang.native.processing import get_symbolication_function if data is None: @@ -148,7 +161,7 @@ def _do_symbolicate_event( ) return - def _continue_to_process_event(): + def _continue_to_process_event() -> None: process_task = ( store.process_event_from_reprocessing if from_reprocessing else store.process_event ) @@ -244,7 +257,12 @@ def _continue_to_process_event(): "tasks.store.symbolicate_event.retry", tags={"symbolication_function": symbolication_function_name}, ) - sleep(min(e.retry_after, SYMBOLICATOR_MAX_RETRY_AFTER)) + sleep_time = ( + SYMBOLICATOR_MAX_RETRY_AFTER + if e.retry_after is None + else min(e.retry_after, SYMBOLICATOR_MAX_RETRY_AFTER) + ) + sleep(sleep_time) continue except Exception: metrics.incr( @@ -283,7 +301,7 @@ def _continue_to_process_event(): return _continue_to_process_event() -@instrumented_task( +@instrumented_task( # type: ignore name="sentry.tasks.store.symbolicate_event", queue="events.symbolicate_event", time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 30, @@ -291,8 +309,13 @@ def _continue_to_process_event(): acks_late=True, ) def symbolicate_event( - cache_key, start_time=None, event_id=None, data=None, queue_switches=0, **kwargs -): + cache_key: str, + start_time: Optional[int] = None, + event_id: Optional[str] = None, + data: Optional[Event] = None, + queue_switches: int = 0, + **kwargs: Any, +) -> None: """ Handles event symbolication using the external service: symbolicator. @@ -310,7 +333,7 @@ def symbolicate_event( ) -@instrumented_task( +@instrumented_task( # type: ignore name="sentry.tasks.store.symbolicate_event_low_priority", queue="events.symbolicate_event_low_priority", time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 30, @@ -318,8 +341,13 @@ def symbolicate_event( acks_late=True, ) def symbolicate_event_low_priority( - cache_key, start_time=None, event_id=None, data=None, queue_switches=0, **kwargs -): + cache_key: str, + start_time: Optional[int] = None, + event_id: Optional[str] = None, + data: Optional[Event] = None, + queue_switches: int = 0, + **kwargs: Any, +) -> None: """ Handles event symbolication using the external service: symbolicator. @@ -340,7 +368,7 @@ def symbolicate_event_low_priority( ) -@instrumented_task( +@instrumented_task( # type: ignore name="sentry.tasks.store.symbolicate_event_from_reprocessing", queue="events.reprocessing.symbolicate_event", time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 30, @@ -348,8 +376,13 @@ def symbolicate_event_low_priority( acks_late=True, ) def symbolicate_event_from_reprocessing( - cache_key, start_time=None, event_id=None, data=None, queue_switches=0, **kwargs -): + cache_key: str, + start_time: Optional[int] = None, + event_id: Optional[str] = None, + data: Optional[Event] = None, + queue_switches: int = 0, + **kwargs: Any, +) -> None: return _do_symbolicate_event( cache_key=cache_key, start_time=start_time, @@ -360,7 +393,7 @@ def symbolicate_event_from_reprocessing( ) -@instrumented_task( +@instrumented_task( # type: ignore name="sentry.tasks.store.symbolicate_event_from_reprocessing_low_priority", queue="events.reprocessing.symbolicate_event_low_priority", time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 30, @@ -368,8 +401,13 @@ def symbolicate_event_from_reprocessing( acks_late=True, ) def symbolicate_event_from_reprocessing_low_priority( - cache_key, start_time=None, event_id=None, data=None, queue_switches=0, **kwargs -): + cache_key: str, + start_time: Optional[int] = None, + event_id: Optional[str] = None, + data: Optional[Event] = None, + queue_switches: int = 0, + **kwargs: Any, +) -> None: return _do_symbolicate_event( cache_key=cache_key, start_time=start_time, diff --git a/tests/sentry/tasks/test_store.py b/tests/sentry/tasks/test_store.py index e1b889544404fb..d4409350352eb1 100644 --- a/tests/sentry/tasks/test_store.py +++ b/tests/sentry/tasks/test_store.py @@ -103,7 +103,7 @@ def test_move_to_process_event( "extra": {"foo": "bar"}, } - preprocess_event(data=data) + preprocess_event(cache_key="", data=data) assert mock_symbolicate_event.delay.call_count == 0 assert mock_process_event.delay.call_count == 1 @@ -123,7 +123,7 @@ def test_move_to_save_event( "extra": {"foo": "bar"}, } - preprocess_event(data=data) + preprocess_event(cache_key="", data=data) assert mock_symbolicate_event.delay.call_count == 0 assert mock_process_event.delay.call_count == 0 diff --git a/tests/sentry/tasks/test_symbolication.py b/tests/sentry/tasks/test_symbolication.py index 5568c2c930693c..c0d42298c3e319 100644 --- a/tests/sentry/tasks/test_symbolication.py +++ b/tests/sentry/tasks/test_symbolication.py @@ -103,7 +103,7 @@ def test_move_to_symbolicate_event( "extra": {"foo": "bar"}, } - preprocess_event(data=data) + preprocess_event(cache_key="", data=data) assert mock_symbolicate_event.delay.call_count == 1 assert mock_process_event.delay.call_count == 0 @@ -129,7 +129,7 @@ def test_move_to_symbolicate_event_low_priority( "extra": {"foo": "bar"}, } - preprocess_event(data=data) + preprocess_event(cache_key="", data=data) assert mock_symbolicate_event_low_priority.delay.call_count == 1 assert mock_symbolicate_event.delay.call_count == 0