From 5f68c4598a0e8067225674fdee70428556a29d13 Mon Sep 17 00:00:00 2001 From: Betty Da Date: Wed, 29 Sep 2021 19:51:50 -0400 Subject: [PATCH 01/21] ref: Move all symbolicator-related code in store.py to a separate file ingest can stop being pulled in for reviews on PRs that do not require their feedback or should not require their feedback NATIVE-259 --- .github/CODEOWNERS | 5 +- src/sentry/tasks/store.py | 292 +--------------------------- src/sentry/tasks/symbolication.py | 303 ++++++++++++++++++++++++++++++ 3 files changed, 312 insertions(+), 288 deletions(-) create mode 100644 src/sentry/tasks/symbolication.py diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index af44a0b6ad80fd..7a88a61dbbc485 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -249,11 +249,12 @@ build-utils/ @getsentry/owners-js-build /src/sentry/api/endpoints/chunk.py @getsentry/owners-native /src/sentry/api/endpoints/project_app_store_connect_credentials.py @getsentry/owners-native /src/sentry/lang/native/ @getsentry/owners-native -src/sentry/processing/realtime_metrics/ @getsentry/owners-native +/src/sentry/processing/realtime_metrics/ @getsentry/owners-native /src/sentry/tasks/app_store_connect.py @getsentry/owners-native /src/sentry/tasks/assemble.py @getsentry/owners-native /src/sentry/tasks/low_priority_symbolication.py @getsentry/owners-native -/tests/sentry/tasks/test_low_priority_symbolication.py @getsentry/owners-native +/src/sentry/tasks/symbolication.py @getsentry/owners-native /src/sentry/utils/appleconnect/ @getsentry/owners-native +/tests/sentry/tasks/test_low_priority_symbolication.py @getsentry/owners-native ## End of Native ## diff --git a/src/sentry/tasks/store.py b/src/sentry/tasks/store.py index 22667d18d6181b..5f049d76b02f80 100644 --- a/src/sentry/tasks/store.py +++ b/src/sentry/tasks/store.py @@ -1,7 +1,6 @@ import logging -import random from datetime import datetime -from time import sleep, time +from time import time import sentry_sdk from django.conf import settings @@ -15,7 +14,6 @@ from sentry.eventstore.processing import event_processing_store from sentry.killswitches import killswitch_matches_context from sentry.models import Activity, Organization, Project, ProjectOption -from sentry.processing import realtime_metrics from sentry.stacktraces.processing import process_stacktraces, should_process_for_stacktraces from sentry.tasks.base import instrumented_task from sentry.utils import metrics @@ -30,18 +28,11 @@ # Is reprocessing on or off by default? REPROCESSING_DEFAULT = False -SYMBOLICATOR_MAX_RETRY_AFTER = settings.SYMBOLICATOR_MAX_RETRY_AFTER - class RetryProcessing(Exception): pass -class RetrySymbolication(Exception): - def __init__(self, retry_after=None): - self.retry_after = retry_after - - @metrics.wraps("should_process") def should_process(data): """Quick check if processing is needed at all.""" @@ -84,41 +75,6 @@ def submit_process( ) -def should_demote_symbolication(project_id): - """ - Determines whether a project's symbolication events should be pushed to the low priority queue. - """ - always_lowpri = killswitch_matches_context( - "store.symbolicate-event-lpq-always", - { - "project_id": project_id, - }, - ) - never_lowpri = killswitch_matches_context( - "store.symbolicate-event-lpq-never", - { - "project_id": project_id, - }, - ) - return not never_lowpri and always_lowpri - - -def submit_symbolicate(project, from_reprocessing, cache_key, event_id, start_time, data): - task = symbolicate_event_from_reprocessing if from_reprocessing else symbolicate_event - task.delay(cache_key=cache_key, start_time=start_time, event_id=event_id) - - -def submit_symbolicate_low_priority( - project, from_reprocessing, cache_key, event_id, start_time, data -): - task = ( - symbolicate_event_from_reprocessing_low_priority - if from_reprocessing - else symbolicate_event_low_priority - ) - task.delay(cache_key=cache_key, start_time=start_time, event_id=event_id) - - def submit_save_event(project_id, from_reprocessing, cache_key, event_id, start_time, data): if cache_key: data = None @@ -136,6 +92,11 @@ def submit_save_event(project_id, from_reprocessing, cache_key, event_id, start_ def _do_preprocess_event(cache_key, data, start_time, event_id, process_task, project): from sentry.lang.native.processing import should_process_with_symbolicator + from sentry.tasks.symbolication import ( + should_demote_symbolication, + submit_symbolicate, + submit_symbolicate_low_priority, + ) if cache_key and data is None: data = event_processing_store.get(cache_key) @@ -229,247 +190,6 @@ def preprocess_event_from_reprocessing( ) -def _do_symbolicate_event(cache_key, start_time, event_id, symbolicate_task, data=None): - from sentry.lang.native.processing import get_symbolication_function - - if data is None: - data = event_processing_store.get(cache_key) - - if data is None: - metrics.incr( - "events.failed", tags={"reason": "cache", "stage": "symbolicate"}, skip_internal=False - ) - error_logger.error("symbolicate.failed.empty", extra={"cache_key": cache_key}) - return - - data = CanonicalKeyDict(data) - - project_id = data["project"] - set_current_event_project(project_id) - - event_id = data["event_id"] - - from_reprocessing = ( - symbolicate_task is symbolicate_event_from_reprocessing - or symbolicate_task is symbolicate_event_from_reprocessing_low_priority - ) - - def _continue_to_process_event(): - process_task = process_event_from_reprocessing if from_reprocessing else process_event - _do_process_event( - cache_key=cache_key, - start_time=start_time, - event_id=event_id, - process_task=process_task, - data=data, - data_has_changed=has_changed, - from_symbolicate=True, - ) - - symbolication_function = get_symbolication_function(data) - symbolication_function_name = getattr(symbolication_function, "__name__", "none") - - if killswitch_matches_context( - "store.load-shed-symbolicate-event-projects", - { - "project_id": project_id, - "event_id": event_id, - "platform": data.get("platform") or "null", - "symbolication_function": symbolication_function_name, - }, - ): - return _continue_to_process_event() - - has_changed = False - - symbolication_start_time = time() - - submission_ratio = options.get("symbolicate-event.low-priority.metrics.submission-rate") - submit_realtime_metrics = not from_reprocessing and random.random() < submission_ratio - - if submit_realtime_metrics: - with sentry_sdk.start_span(op="tasks.store.symbolicate_event.low_priority.metrics.counter"): - timestamp = int(symbolication_start_time) - try: - realtime_metrics.increment_project_event_counter(project_id, timestamp) - except Exception as e: - sentry_sdk.capture_exception(e) - - with sentry_sdk.start_span(op="tasks.store.symbolicate_event.symbolication") as span: - span.set_data("symbolication_function", symbolication_function_name) - with metrics.timer( - "tasks.store.symbolicate_event.symbolication", - tags={"symbolication_function": symbolication_function_name}, - ): - while True: - try: - with sentry_sdk.start_span( - op="tasks.store.symbolicate_event.%s" % symbolication_function_name - ) as span: - symbolicated_data = symbolication_function(data) - span.set_data("symbolicated_data", bool(symbolicated_data)) - - if symbolicated_data: - data = symbolicated_data - has_changed = True - - break - except RetrySymbolication as e: - if ( - time() - symbolication_start_time - ) > settings.SYMBOLICATOR_PROCESS_EVENT_WARN_TIMEOUT: - error_logger.warning( - "symbolicate.slow", - extra={"project_id": project_id, "event_id": event_id}, - ) - if ( - time() - symbolication_start_time - ) > settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT: - # Do not drop event but actually continue with rest of pipeline - # (persisting unsymbolicated event) - metrics.incr( - "tasks.store.symbolicate_event.fatal", - tags={ - "reason": "timeout", - "symbolication_function": symbolication_function_name, - }, - ) - error_logger.exception( - "symbolicate.failed.infinite_retry", - extra={"project_id": project_id, "event_id": event_id}, - ) - data.setdefault("_metrics", {})["flag.processing.error"] = True - data.setdefault("_metrics", {})["flag.processing.fatal"] = True - has_changed = True - break - else: - # sleep for `retry_after` but max 5 seconds and try again - metrics.incr( - "tasks.store.symbolicate_event.retry", - tags={"symbolication_function": symbolication_function_name}, - ) - sleep(min(e.retry_after, SYMBOLICATOR_MAX_RETRY_AFTER)) - continue - except Exception: - metrics.incr( - "tasks.store.symbolicate_event.fatal", - tags={ - "reason": "error", - "symbolication_function": symbolication_function_name, - }, - ) - error_logger.exception("tasks.store.symbolicate_event.symbolication") - data.setdefault("_metrics", {})["flag.processing.error"] = True - data.setdefault("_metrics", {})["flag.processing.fatal"] = True - has_changed = True - break - - if submit_realtime_metrics: - with sentry_sdk.start_span( - op="tasks.store.symbolicate_event.low_priority.metrics.histogram" - ): - symbolication_duration = int(time() - symbolication_start_time) - try: - realtime_metrics.increment_project_duration_counter( - project_id, timestamp, symbolication_duration - ) - except Exception as e: - sentry_sdk.capture_exception(e) - - # We cannot persist canonical types in the cache, so we need to - # downgrade this. - if isinstance(data, CANONICAL_TYPES): - data = dict(data.items()) - - if has_changed: - cache_key = event_processing_store.store(data) - - return _continue_to_process_event() - - -@instrumented_task( - name="sentry.tasks.store.symbolicate_event", - queue="events.symbolicate_event", - time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 30, - soft_time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 20, - acks_late=True, -) -def symbolicate_event(cache_key, start_time=None, event_id=None, **kwargs): - """ - Handles event symbolication using the external service: symbolicator. - - :param string cache_key: the cache key for the event data - :param int start_time: the timestamp when the event was ingested - :param string event_id: the event identifier - """ - return _do_symbolicate_event( - cache_key=cache_key, - start_time=start_time, - event_id=event_id, - symbolicate_task=symbolicate_event, - ) - - -@instrumented_task( - name="sentry.tasks.store.symbolicate_event_low_priority", - queue="events.symbolicate_event_low_priority", - time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 30, - soft_time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 20, - acks_late=True, -) -def symbolicate_event_low_priority(cache_key, start_time=None, event_id=None, **kwargs): - """ - Handles event symbolication using the external service: symbolicator. - - This puts the task on the low priority queue. Projects whose symbolication - events misbehave get sent there to protect the main queue. - - :param string cache_key: the cache key for the event data - :param int start_time: the timestamp when the event was ingested - :param string event_id: the event identifier - """ - return _do_symbolicate_event( - cache_key=cache_key, - start_time=start_time, - event_id=event_id, - symbolicate_task=symbolicate_event_low_priority, - ) - - -@instrumented_task( - name="sentry.tasks.store.symbolicate_event_from_reprocessing", - queue="events.reprocessing.symbolicate_event", - time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 30, - soft_time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 20, - acks_late=True, -) -def symbolicate_event_from_reprocessing(cache_key, start_time=None, event_id=None, **kwargs): - return _do_symbolicate_event( - cache_key=cache_key, - start_time=start_time, - event_id=event_id, - symbolicate_task=symbolicate_event_from_reprocessing, - ) - - -@instrumented_task( - name="sentry.tasks.store.symbolicate_event_from_reprocessing_low_priority", - queue="events.reprocessing.symbolicate_event_low_priority", - time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 30, - soft_time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 20, - acks_late=True, -) -def symbolicate_event_from_reprocessing_low_priority( - cache_key, start_time=None, event_id=None, **kwargs -): - return _do_symbolicate_event( - cache_key=cache_key, - start_time=start_time, - event_id=event_id, - symbolicate_task=symbolicate_event_from_reprocessing_low_priority, - ) - - @instrumented_task( name="sentry.tasks.store.retry_process_event", queue="sleep", diff --git a/src/sentry/tasks/symbolication.py b/src/sentry/tasks/symbolication.py new file mode 100644 index 00000000000000..3136d52dc75bea --- /dev/null +++ b/src/sentry/tasks/symbolication.py @@ -0,0 +1,303 @@ +import logging +import random +from time import sleep, time + +import sentry_sdk +from django.conf import settings + +from sentry import options +from sentry.eventstore.processing import event_processing_store +from sentry.killswitches import killswitch_matches_context +from sentry.processing import realtime_metrics +from sentry.tasks.base import instrumented_task +from sentry.utils import metrics +from sentry.utils.canonical import CANONICAL_TYPES, CanonicalKeyDict +from sentry.utils.sdk import set_current_event_project + +error_logger = logging.getLogger("sentry.errors.events") +info_logger = logging.getLogger("sentry.symbolication") + +# Is reprocessing on or off by default? +REPROCESSING_DEFAULT = False + +SYMBOLICATOR_MAX_RETRY_AFTER = settings.SYMBOLICATOR_MAX_RETRY_AFTER + + +class RetrySymbolication(Exception): + def __init__(self, retry_after=None): + self.retry_after = retry_after + + +def should_demote_symbolication(project_id): + """ + Determines whether a project's symbolication events should be pushed to the low priority queue. + """ + always_lowpri = killswitch_matches_context( + "store.symbolicate-event-lpq-always", + { + "project_id": project_id, + }, + ) + never_lowpri = killswitch_matches_context( + "store.symbolicate-event-lpq-never", + { + "project_id": project_id, + }, + ) + return not never_lowpri and always_lowpri + + +def submit_symbolicate(project, from_reprocessing, cache_key, event_id, start_time, data): + task = symbolicate_event_from_reprocessing if from_reprocessing else symbolicate_event + task.delay(cache_key=cache_key, start_time=start_time, event_id=event_id) + + +def submit_symbolicate_low_priority( + project, from_reprocessing, cache_key, event_id, start_time, data +): + task = ( + symbolicate_event_from_reprocessing_low_priority + if from_reprocessing + else symbolicate_event_low_priority + ) + task.delay(cache_key=cache_key, start_time=start_time, event_id=event_id) + + +def _do_symbolicate_event(cache_key, start_time, event_id, symbolicate_task, data=None): + from sentry.lang.native.processing import get_symbolication_function + from sentry.tasks.store import _do_process_event, process_event, process_event_from_reprocessing + + if data is None: + data = event_processing_store.get(cache_key) + + if data is None: + metrics.incr( + "events.failed", tags={"reason": "cache", "stage": "symbolicate"}, skip_internal=False + ) + error_logger.error("symbolicate.failed.empty", extra={"cache_key": cache_key}) + return + + data = CanonicalKeyDict(data) + + project_id = data["project"] + set_current_event_project(project_id) + + event_id = data["event_id"] + + from_reprocessing = symbolicate_task is symbolicate_event_from_reprocessing + + def _continue_to_process_event(): + process_task = process_event_from_reprocessing if from_reprocessing else process_event + # TODO: this uses a private "store" function. is there a way to not do this? + _do_process_event( + cache_key=cache_key, + start_time=start_time, + event_id=event_id, + process_task=process_task, + data=data, + data_has_changed=has_changed, + from_symbolicate=True, + ) + + symbolication_function = get_symbolication_function(data) + symbolication_function_name = getattr(symbolication_function, "__name__", "none") + + if killswitch_matches_context( + "store.load-shed-symbolicate-event-projects", + { + "project_id": project_id, + "event_id": event_id, + "platform": data.get("platform") or "null", + "symbolication_function": symbolication_function_name, + }, + ): + return _continue_to_process_event() + + has_changed = False + + symbolication_start_time = time() + + submission_ratio = options.get("symbolicate-event.low-priority.metrics.submission-rate") + submit_realtime_metrics = not from_reprocessing and random.random() < submission_ratio + + if submit_realtime_metrics: + with sentry_sdk.start_span(op="tasks.store.symbolicate_event.low_priority.metrics.counter"): + timestamp = int(symbolication_start_time) + try: + realtime_metrics.increment_project_event_counter(project_id, timestamp) + except Exception as e: + sentry_sdk.capture_exception(e) + + with sentry_sdk.start_span(op="tasks.store.symbolicate_event.symbolication") as span: + span.set_data("symbolication_function", symbolication_function_name) + with metrics.timer( + "tasks.store.symbolicate_event.symbolication", + tags={"symbolication_function": symbolication_function_name}, + ): + while True: + try: + with sentry_sdk.start_span( + op="tasks.store.symbolicate_event.%s" % symbolication_function_name + ) as span: + symbolicated_data = symbolication_function(data) + span.set_data("symbolicated_data", bool(symbolicated_data)) + + if symbolicated_data: + data = symbolicated_data + has_changed = True + + break + except RetrySymbolication as e: + if ( + time() - symbolication_start_time + ) > settings.SYMBOLICATOR_PROCESS_EVENT_WARN_TIMEOUT: + error_logger.warning( + "symbolicate.slow", + extra={"project_id": project_id, "event_id": event_id}, + ) + if ( + time() - symbolication_start_time + ) > settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT: + # Do not drop event but actually continue with rest of pipeline + # (persisting unsymbolicated event) + metrics.incr( + "tasks.store.symbolicate_event.fatal", + tags={ + "reason": "timeout", + "symbolication_function": symbolication_function_name, + }, + ) + error_logger.exception( + "symbolicate.failed.infinite_retry", + extra={"project_id": project_id, "event_id": event_id}, + ) + data.setdefault("_metrics", {})["flag.processing.error"] = True + data.setdefault("_metrics", {})["flag.processing.fatal"] = True + has_changed = True + break + else: + # sleep for `retry_after` but max 5 seconds and try again + metrics.incr( + "tasks.store.symbolicate_event.retry", + tags={"symbolication_function": symbolication_function_name}, + ) + sleep(min(e.retry_after, SYMBOLICATOR_MAX_RETRY_AFTER)) + continue + except Exception: + metrics.incr( + "tasks.store.symbolicate_event.fatal", + tags={ + "reason": "error", + "symbolication_function": symbolication_function_name, + }, + ) + error_logger.exception("tasks.store.symbolicate_event.symbolication") + data.setdefault("_metrics", {})["flag.processing.error"] = True + data.setdefault("_metrics", {})["flag.processing.fatal"] = True + has_changed = True + break + + if submit_realtime_metrics: + with sentry_sdk.start_span( + op="tasks.store.symbolicate_event.low_priority.metrics.histogram" + ): + symbolication_duration = int(time() - symbolication_start_time) + try: + realtime_metrics.increment_project_duration_counter( + project_id, timestamp, symbolication_duration + ) + except Exception as e: + sentry_sdk.capture_exception(e) + + # We cannot persist canonical types in the cache, so we need to + # downgrade this. + if isinstance(data, CANONICAL_TYPES): + data = dict(data.items()) + + if has_changed: + cache_key = event_processing_store.store(data) + + return _continue_to_process_event() + + +@instrumented_task( + name="sentry.tasks.store.symbolicate_event", + queue="events.symbolicate_event", + time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 30, + soft_time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 20, + acks_late=True, +) +def symbolicate_event(cache_key, start_time=None, event_id=None, **kwargs): + """ + Handles event symbolication using the external service: symbolicator. + + :param string cache_key: the cache key for the event data + :param int start_time: the timestamp when the event was ingested + :param string event_id: the event identifier + """ + return _do_symbolicate_event( + cache_key=cache_key, + start_time=start_time, + event_id=event_id, + symbolicate_task=symbolicate_event, + ) + + +@instrumented_task( + name="sentry.tasks.store.symbolicate_event_low_priority", + queue="events.symbolicate_event_low_priority", + time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 30, + soft_time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 20, + acks_late=True, +) +def symbolicate_event_low_priority(cache_key, start_time=None, event_id=None, **kwargs): + """ + Handles event symbolication using the external service: symbolicator. + + This puts the task on the low priority queue. Projects whose symbolication + events misbehave get sent there to protect the main queue. + + :param string cache_key: the cache key for the event data + :param int start_time: the timestamp when the event was ingested + :param string event_id: the event identifier + """ + return _do_symbolicate_event( + cache_key=cache_key, + start_time=start_time, + event_id=event_id, + symbolicate_task=symbolicate_event_low_priority, + ) + + +@instrumented_task( + name="sentry.tasks.store.symbolicate_event_from_reprocessing", + queue="events.reprocessing.symbolicate_event", + time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 30, + soft_time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 20, + acks_late=True, +) +def symbolicate_event_from_reprocessing(cache_key, start_time=None, event_id=None, **kwargs): + return _do_symbolicate_event( + cache_key=cache_key, + start_time=start_time, + event_id=event_id, + symbolicate_task=symbolicate_event_from_reprocessing, + ) + + +@instrumented_task( + name="sentry.tasks.store.symbolicate_event_from_reprocessing_low_priority", + queue="events.reprocessing.symbolicate_event_low_priority", + time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 30, + soft_time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 20, + acks_late=True, +) +def symbolicate_event_from_reprocessing_low_priority( + cache_key, start_time=None, event_id=None, **kwargs +): + return _do_symbolicate_event( + cache_key=cache_key, + start_time=start_time, + event_id=event_id, + symbolicate_task=symbolicate_event_from_reprocessing_low_priority, + ) From a2212cd3f2cae4411c7c06bcbcecf03ba7bac2d7 Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Fri, 1 Oct 2021 17:44:41 +0200 Subject: [PATCH 02/21] Fix tests --- src/sentry/tasks/store.py | 18 +++++++++--------- src/sentry/tasks/symbolication.py | 6 +++--- tests/sentry/tasks/test_store.py | 9 ++++----- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/src/sentry/tasks/store.py b/src/sentry/tasks/store.py index 5f049d76b02f80..f45a4dc59457b3 100644 --- a/src/sentry/tasks/store.py +++ b/src/sentry/tasks/store.py @@ -11,7 +11,7 @@ from sentry.attachments import attachment_cache from sentry.constants import DEFAULT_STORE_NORMALIZER_ARGS from sentry.datascrubbing import scrub_data -from sentry.eventstore.processing import event_processing_store +from sentry.eventstore import processing from sentry.killswitches import killswitch_matches_context from sentry.models import Activity, Organization, Project, ProjectOption from sentry.stacktraces.processing import process_stacktraces, should_process_for_stacktraces @@ -99,7 +99,7 @@ def _do_preprocess_event(cache_key, data, start_time, event_id, process_task, pr ) if cache_key and data is None: - data = event_processing_store.get(cache_key) + data = processing.event_processing_store.get(cache_key) if data is None: metrics.incr("events.failed", tags={"reason": "cache", "stage": "pre"}, skip_internal=False) @@ -226,7 +226,7 @@ def _do_process_event( from sentry.plugins.base import plugins if data is None: - data = event_processing_store.get(cache_key) + data = processing.event_processing_store.get(cache_key) if data is None: metrics.incr( @@ -376,7 +376,7 @@ def _continue_to_save_event(): _do_preprocess_event(cache_key, data, start_time, event_id, process_task, project) return - cache_key = event_processing_store.store(data) + cache_key = processing.event_processing_store.store(data) return _continue_to_save_event() @@ -511,7 +511,7 @@ def create_failed_event( # from the last processing step because we do not want any # modifications to take place. delete_raw_event(project_id, event_id) - data = event_processing_store.get(cache_key) + data = processing.event_processing_store.get(cache_key) if data is None: metrics.incr("events.failed", tags={"reason": "cache", "stage": "raw"}, skip_internal=False) @@ -537,7 +537,7 @@ def create_failed_event( data=issue["data"], ) - event_processing_store.delete_by_key(cache_key) + processing.event_processing_store.delete_by_key(cache_key) return True @@ -557,7 +557,7 @@ def _do_save_event( if cache_key and data is None: with metrics.timer("tasks.store.do_save_event.get_cache") as metric_tags: - data = event_processing_store.get(cache_key) + data = processing.event_processing_store.get(cache_key) if data is not None: metric_tags["event_type"] = event_type = data.get("type") or "none" @@ -620,12 +620,12 @@ def _do_save_event( if isinstance(data, CANONICAL_TYPES): data = dict(data.items()) with metrics.timer("tasks.store.do_save_event.write_processing_cache"): - event_processing_store.store(data) + processing.event_processing_store.store(data) except HashDiscarded: # Delete the event payload from cache since it won't show up in post-processing. if cache_key: with metrics.timer("tasks.store.do_save_event.delete_cache"): - event_processing_store.delete_by_key(cache_key) + processing.event_processing_store.delete_by_key(cache_key) finally: reprocessing2.mark_event_reprocessed(data) diff --git a/src/sentry/tasks/symbolication.py b/src/sentry/tasks/symbolication.py index 3136d52dc75bea..3cecbc5c5eabe7 100644 --- a/src/sentry/tasks/symbolication.py +++ b/src/sentry/tasks/symbolication.py @@ -6,7 +6,7 @@ from django.conf import settings from sentry import options -from sentry.eventstore.processing import event_processing_store +from sentry.eventstore import processing from sentry.killswitches import killswitch_matches_context from sentry.processing import realtime_metrics from sentry.tasks.base import instrumented_task @@ -68,7 +68,7 @@ def _do_symbolicate_event(cache_key, start_time, event_id, symbolicate_task, dat from sentry.tasks.store import _do_process_event, process_event, process_event_from_reprocessing if data is None: - data = event_processing_store.get(cache_key) + data = processing.event_processing_store.get(cache_key) if data is None: metrics.incr( @@ -215,7 +215,7 @@ def _continue_to_process_event(): data = dict(data.items()) if has_changed: - cache_key = event_processing_store.store(data) + cache_key = processing.event_processing_store.store(data) return _continue_to_process_event() diff --git a/tests/sentry/tasks/test_store.py b/tests/sentry/tasks/test_store.py index f80788f8318ad4..20e8a29725e3ad 100644 --- a/tests/sentry/tasks/test_store.py +++ b/tests/sentry/tasks/test_store.py @@ -10,10 +10,9 @@ preprocess_event, process_event, save_event, - should_demote_symbolication, - symbolicate_event, time_synthetic_monitoring_event, ) +from sentry.tasks.symbolication import should_demote_symbolication, symbolicate_event from sentry.testutils.helpers.options import override_options from sentry.utils.compat import mock @@ -59,13 +58,13 @@ def mock_process_event(): @pytest.fixture def mock_symbolicate_event(): - with mock.patch("sentry.tasks.store.symbolicate_event") as m: + with mock.patch("sentry.tasks.symbolication.symbolicate_event") as m: yield m @pytest.fixture def mock_symbolicate_event_low_priority(): - with mock.patch("sentry.tasks.store.symbolicate_event_low_priority") as m: + with mock.patch("sentry.tasks.symbolication.symbolicate_event_low_priority") as m: yield m @@ -77,7 +76,7 @@ def mock_get_symbolication_function(): @pytest.fixture def mock_event_processing_store(): - with mock.patch("sentry.tasks.store.event_processing_store") as m: + with mock.patch("sentry.eventstore.processing.event_processing_store") as m: yield m From 7fd4abc07194e4b0a3266ed0616306e2a4380c1f Mon Sep 17 00:00:00 2001 From: Betty Da Date: Mon, 4 Oct 2021 13:22:14 -0400 Subject: [PATCH 03/21] add new symbolication-specific tasks to celery imports --- src/sentry/conf/server.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 95a267bb6df287..2f7d55b7d59520 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -578,6 +578,7 @@ def SOCIAL_AUTH_DEFAULT_USERNAME(): "sentry.tasks.sentry_apps", "sentry.tasks.servicehooks", "sentry.tasks.store", + "sentry.tasks.symbolication", "sentry.tasks.unmerge", "sentry.tasks.update_user_reports", "sentry.tasks.user_report", From 5242505e02dafaa393a5d81e1644e3bb5897d738 Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Fri, 1 Oct 2021 14:32:47 +0200 Subject: [PATCH 04/21] ref: Add types to store and symbolication --- mypy.ini | 2 + src/sentry/tasks/store.py | 142 ++++++++++++++++++++---------- src/sentry/tasks/symbolication.py | 76 +++++++++++----- 3 files changed, 154 insertions(+), 66 deletions(-) diff --git a/mypy.ini b/mypy.ini index d20b3466643332..6f13d5bf57df7f 100644 --- a/mypy.ini +++ b/mypy.ini @@ -55,6 +55,8 @@ files = src/sentry/api/bases/external_actor.py, src/sentry/spans/**/*.py, src/sentry/tasks/app_store_connect.py, src/sentry/tasks/low_priority_symbolication.py, + src/sentry/tasks/store.py, + src/sentry/tasks/symbolication.py, src/sentry/tasks/update_user_reports.py, src/sentry/unmerge.py, src/sentry/utils/appleconnect/, diff --git a/src/sentry/tasks/store.py b/src/sentry/tasks/store.py index f45a4dc59457b3..f95dea99d951a2 100644 --- a/src/sentry/tasks/store.py +++ b/src/sentry/tasks/store.py @@ -1,6 +1,7 @@ import logging from datetime import datetime from time import time +from typing import Any, Dict, List, Optional import sentry_sdk from django.conf import settings @@ -33,9 +34,13 @@ class RetryProcessing(Exception): pass -@metrics.wraps("should_process") -def should_process(data): +@metrics.wraps("should_process") # type: ignore +def should_process(data: CanonicalKeyDict) -> bool: """Quick check if processing is needed at all.""" + return _should_process_inner(data) + + +def _should_process_inner(data: CanonicalKeyDict) -> bool: from sentry.plugins.base import plugins if data.get("type") == "transaction": @@ -59,13 +64,12 @@ def should_process(data): def submit_process( - project, - from_reprocessing, - cache_key, - event_id, - start_time, - data_has_changed=None, -): + from_reprocessing: bool, + cache_key: Optional[str], + event_id: Optional[str], + start_time: Optional[int], + data_has_changed: bool = False, +) -> None: task = process_event_from_reprocessing if from_reprocessing else process_event task.delay( cache_key=cache_key, @@ -75,7 +79,14 @@ def submit_process( ) -def submit_save_event(project_id, from_reprocessing, cache_key, event_id, start_time, data): +def submit_save_event( + project_id: int, + from_reprocessing: bool, + cache_key: Optional[str], + event_id: Optional[str], + start_time: Optional[int], + data: Optional[Event], +) -> None: if cache_key: data = None @@ -90,7 +101,14 @@ def submit_save_event(project_id, from_reprocessing, cache_key, event_id, start_ ) -def _do_preprocess_event(cache_key, data, start_time, event_id, process_task, project): +def _do_preprocess_event( + cache_key: Optional[str], + data: Optional[Event], + start_time: Optional[int], + event_id: Optional[str], + process_task: Any, + project: Optional[Project], +) -> None: from sentry.lang.native.processing import should_process_with_symbolicator from sentry.tasks.symbolication import ( should_demote_symbolication, @@ -129,7 +147,6 @@ def _do_preprocess_event(cache_key, data, start_time, event_id, process_task, pr is_low_priority = should_demote_symbolication(project_id) task = submit_symbolicate_low_priority if is_low_priority else submit_symbolicate task( - project, from_reprocessing, cache_key, event_id, @@ -140,7 +157,6 @@ def _do_preprocess_event(cache_key, data, start_time, event_id, process_task, pr if should_process(data): submit_process( - project, from_reprocessing, cache_key, event_id, @@ -152,15 +168,19 @@ def _do_preprocess_event(cache_key, data, start_time, event_id, process_task, pr submit_save_event(project_id, from_reprocessing, cache_key, event_id, start_time, original_data) -@instrumented_task( +@instrumented_task( # type: ignore name="sentry.tasks.store.preprocess_event", queue="events.preprocess_event", time_limit=65, soft_time_limit=60, ) def preprocess_event( - cache_key=None, data=None, start_time=None, event_id=None, project=None, **kwargs -): + cache_key: Optional[str] = None, + data: Optional[Event] = None, + start_time: Optional[int] = None, + event_id: Optional[str] = None, + project: Optional[Project] = None, +) -> None: return _do_preprocess_event( cache_key=cache_key, data=data, @@ -171,15 +191,19 @@ def preprocess_event( ) -@instrumented_task( +@instrumented_task( # type: ignore name="sentry.tasks.store.preprocess_event_from_reprocessing", queue="events.reprocessing.preprocess_event", time_limit=65, soft_time_limit=60, ) def preprocess_event_from_reprocessing( - cache_key=None, data=None, start_time=None, event_id=None, project=None, **kwargs -): + cache_key: Optional[str] = None, + data: Optional[Event] = None, + start_time: Optional[int] = None, + event_id: Optional[str] = None, + project: Optional[Project] = None, +) -> None: return _do_preprocess_event( cache_key=cache_key, data=data, @@ -190,13 +214,13 @@ def preprocess_event_from_reprocessing( ) -@instrumented_task( +@instrumented_task( # type: ignore name="sentry.tasks.store.retry_process_event", queue="sleep", time_limit=(60 * 5) + 5, soft_time_limit=60 * 5, ) -def retry_process_event(process_task_name, task_kwargs, **kwargs): +def retry_process_event(process_task_name: str, task_kwargs: Any, **kwargs: Any) -> None: """ The only purpose of this task is be enqueued with some ETA set. This is essentially an implementation of ETAs on top of Celery's existing ETAs, but @@ -215,14 +239,14 @@ def retry_process_event(process_task_name, task_kwargs, **kwargs): def _do_process_event( - cache_key, - start_time, - event_id, - process_task, - data=None, - data_has_changed=None, - from_symbolicate=False, -): + cache_key: Optional[str], + start_time: Optional[int], + event_id: Optional[str], + process_task: Any, + data: Optional[Event] = None, + data_has_changed: bool = False, + from_symbolicate: bool = False, +) -> None: from sentry.plugins.base import plugins if data is None: @@ -242,7 +266,7 @@ def _do_process_event( event_id = data["event_id"] - def _continue_to_save_event(): + def _continue_to_save_event() -> None: from_reprocessing = process_task is process_event_from_reprocessing submit_save_event(project_id, from_reprocessing, cache_key, event_id, start_time, data) @@ -264,7 +288,7 @@ def _continue_to_save_event(): "organization", Organization.objects.get_from_cache(id=project.organization_id) ) - has_changed = bool(data_has_changed) + has_changed = data_has_changed with sentry_sdk.start_span(op="tasks.store.process_event.get_reprocessing_revision"): # Fetch the reprocessing revision @@ -381,13 +405,18 @@ def _continue_to_save_event(): return _continue_to_save_event() -@instrumented_task( +@instrumented_task( # type: ignore name="sentry.tasks.store.process_event", queue="events.process_event", time_limit=65, soft_time_limit=60, ) -def process_event(cache_key, start_time=None, event_id=None, data_has_changed=None, **kwargs): +def process_event( + cache_key: Optional[str], + start_time: Optional[int] = None, + event_id: Optional[str] = None, + data_has_changed: bool = False, +) -> None: """ Handles event processing (for those events that need it) @@ -407,15 +436,18 @@ def process_event(cache_key, start_time=None, event_id=None, data_has_changed=No ) -@instrumented_task( +@instrumented_task( # type: ignore name="sentry.tasks.store.process_event_from_reprocessing", queue="events.reprocessing.process_event", time_limit=65, soft_time_limit=60, ) def process_event_from_reprocessing( - cache_key, start_time=None, event_id=None, data_has_changed=None, **kwargs -): + cache_key: Optional[str], + start_time: Optional[int] = None, + event_id: Optional[str] = None, + data_has_changed: bool = False, +) -> None: return _do_process_event( cache_key=cache_key, start_time=start_time, @@ -425,7 +457,9 @@ def process_event_from_reprocessing( ) -def delete_raw_event(project_id, event_id, allow_hint_clear=False): +def delete_raw_event( + project_id: int, event_id: Optional[str], allow_hint_clear: bool = False +) -> None: set_current_event_project(project_id) if event_id is None: @@ -453,8 +487,14 @@ def delete_raw_event(project_id, event_id, allow_hint_clear=False): def create_failed_event( - cache_key, data, project_id, issues, event_id, start_time=None, reprocessing_rev=None -): + cache_key: Optional[str], + data: Optional[Any], + project_id: int, + issues: List[Dict[str, str]], + event_id: Optional[str], + start_time: Optional[int] = None, + reprocessing_rev: Any = None, +) -> bool: """If processing failed we put the original data from the cache into a raw event. Returns `True` if a failed event was inserted """ @@ -543,8 +583,12 @@ def create_failed_event( def _do_save_event( - cache_key=None, data=None, start_time=None, event_id=None, project_id=None, **kwargs -): + cache_key: Optional[str] = None, + data: Optional[Event] = None, + start_time: Optional[int] = None, + event_id: Optional[str] = None, + project_id: Optional[int] = None, +) -> None: """ Saves an event to the database. """ @@ -648,7 +692,9 @@ def _do_save_event( time_synthetic_monitoring_event(data, project_id, start_time) -def time_synthetic_monitoring_event(data, project_id, start_time): +def time_synthetic_monitoring_event( + data: Event, project_id: int, start_time: Optional[int] +) -> bool: """ For special events produced by the recurring synthetic monitoring functions, emit timing metrics for: @@ -696,13 +742,17 @@ def time_synthetic_monitoring_event(data, project_id, start_time): return True -@instrumented_task( +@instrumented_task( # type: ignore name="sentry.tasks.store.save_event", queue="events.save_event", time_limit=65, soft_time_limit=60, ) def save_event( - cache_key=None, data=None, start_time=None, event_id=None, project_id=None, **kwargs -): - _do_save_event(cache_key, data, start_time, event_id, project_id, **kwargs) + cache_key: Optional[str] = None, + data: Optional[Event] = None, + start_time: Optional[int] = None, + event_id: Optional[str] = None, + project_id: Optional[int] = None, +) -> None: + _do_save_event(cache_key, data, start_time, event_id, project_id) diff --git a/src/sentry/tasks/symbolication.py b/src/sentry/tasks/symbolication.py index 3cecbc5c5eabe7..246876c6b6446f 100644 --- a/src/sentry/tasks/symbolication.py +++ b/src/sentry/tasks/symbolication.py @@ -1,6 +1,7 @@ import logging import random from time import sleep, time +from typing import Any, Optional import sentry_sdk from django.conf import settings @@ -8,7 +9,7 @@ from sentry import options from sentry.eventstore import processing from sentry.killswitches import killswitch_matches_context -from sentry.processing import realtime_metrics +from sentry.processing import realtime_metrics # type: ignore from sentry.tasks.base import instrumented_task from sentry.utils import metrics from sentry.utils.canonical import CANONICAL_TYPES, CanonicalKeyDict @@ -20,15 +21,15 @@ # Is reprocessing on or off by default? REPROCESSING_DEFAULT = False -SYMBOLICATOR_MAX_RETRY_AFTER = settings.SYMBOLICATOR_MAX_RETRY_AFTER +SYMBOLICATOR_MAX_RETRY_AFTER: int = settings.SYMBOLICATOR_MAX_RETRY_AFTER class RetrySymbolication(Exception): - def __init__(self, retry_after=None): + def __init__(self, retry_after: Optional[int] = None) -> None: self.retry_after = retry_after -def should_demote_symbolication(project_id): +def should_demote_symbolication(project_id: int) -> bool: """ Determines whether a project's symbolication events should be pushed to the low priority queue. """ @@ -47,14 +48,24 @@ def should_demote_symbolication(project_id): return not never_lowpri and always_lowpri -def submit_symbolicate(project, from_reprocessing, cache_key, event_id, start_time, data): +def submit_symbolicate( + from_reprocessing: bool, + cache_key: Optional[str], + event_id: Optional[str], + start_time: Optional[int], + data: Optional[Event], +) -> None: task = symbolicate_event_from_reprocessing if from_reprocessing else symbolicate_event task.delay(cache_key=cache_key, start_time=start_time, event_id=event_id) def submit_symbolicate_low_priority( - project, from_reprocessing, cache_key, event_id, start_time, data -): + from_reprocessing: bool, + cache_key: Optional[str], + event_id: Optional[str], + start_time: Optional[int], + data: Optional[Event], +) -> None: task = ( symbolicate_event_from_reprocessing_low_priority if from_reprocessing @@ -63,7 +74,13 @@ def submit_symbolicate_low_priority( task.delay(cache_key=cache_key, start_time=start_time, event_id=event_id) -def _do_symbolicate_event(cache_key, start_time, event_id, symbolicate_task, data=None): +def _do_symbolicate_event( + cache_key: Optional[str], + start_time: Optional[int], + event_id: Optional[str], + symbolicate_task: Any, + data: Optional[Event] = None, +) -> None: from sentry.lang.native.processing import get_symbolication_function from sentry.tasks.store import _do_process_event, process_event, process_event_from_reprocessing @@ -86,7 +103,7 @@ def _do_symbolicate_event(cache_key, start_time, event_id, symbolicate_task, dat from_reprocessing = symbolicate_task is symbolicate_event_from_reprocessing - def _continue_to_process_event(): + def _continue_to_process_event() -> None: process_task = process_event_from_reprocessing if from_reprocessing else process_event # TODO: this uses a private "store" function. is there a way to not do this? _do_process_event( @@ -181,7 +198,12 @@ def _continue_to_process_event(): "tasks.store.symbolicate_event.retry", tags={"symbolication_function": symbolication_function_name}, ) - sleep(min(e.retry_after, SYMBOLICATOR_MAX_RETRY_AFTER)) + sleep_time = ( + min(e.retry_after, SYMBOLICATOR_MAX_RETRY_AFTER) + if e.retry_after + else SYMBOLICATOR_MAX_RETRY_AFTER + ) + sleep(sleep_time) continue except Exception: metrics.incr( @@ -220,14 +242,18 @@ def _continue_to_process_event(): return _continue_to_process_event() -@instrumented_task( +@instrumented_task( # type: ignore name="sentry.tasks.store.symbolicate_event", queue="events.symbolicate_event", time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 30, soft_time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 20, acks_late=True, ) -def symbolicate_event(cache_key, start_time=None, event_id=None, **kwargs): +def symbolicate_event( + cache_key: Optional[str], + start_time: Optional[int] = None, + event_id: Optional[str] = None, +) -> None: """ Handles event symbolication using the external service: symbolicator. @@ -235,7 +261,7 @@ def symbolicate_event(cache_key, start_time=None, event_id=None, **kwargs): :param int start_time: the timestamp when the event was ingested :param string event_id: the event identifier """ - return _do_symbolicate_event( + _do_symbolicate_event( cache_key=cache_key, start_time=start_time, event_id=event_id, @@ -243,14 +269,18 @@ def symbolicate_event(cache_key, start_time=None, event_id=None, **kwargs): ) -@instrumented_task( +@instrumented_task( # type: ignore name="sentry.tasks.store.symbolicate_event_low_priority", queue="events.symbolicate_event_low_priority", time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 30, soft_time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 20, acks_late=True, ) -def symbolicate_event_low_priority(cache_key, start_time=None, event_id=None, **kwargs): +def symbolicate_event_low_priority( + cache_key: Optional[str], + start_time: Optional[int] = None, + event_id: Optional[str] = None, +) -> None: """ Handles event symbolication using the external service: symbolicator. @@ -269,14 +299,18 @@ def symbolicate_event_low_priority(cache_key, start_time=None, event_id=None, ** ) -@instrumented_task( +@instrumented_task( # type: ignore name="sentry.tasks.store.symbolicate_event_from_reprocessing", queue="events.reprocessing.symbolicate_event", time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 30, soft_time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 20, acks_late=True, ) -def symbolicate_event_from_reprocessing(cache_key, start_time=None, event_id=None, **kwargs): +def symbolicate_event_from_reprocessing( + cache_key: Optional[str], + start_time: Optional[int] = None, + event_id: Optional[str] = None, +) -> None: return _do_symbolicate_event( cache_key=cache_key, start_time=start_time, @@ -285,7 +319,7 @@ def symbolicate_event_from_reprocessing(cache_key, start_time=None, event_id=Non ) -@instrumented_task( +@instrumented_task( # type: ignore name="sentry.tasks.store.symbolicate_event_from_reprocessing_low_priority", queue="events.reprocessing.symbolicate_event_low_priority", time_limit=settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT + 30, @@ -293,8 +327,10 @@ def symbolicate_event_from_reprocessing(cache_key, start_time=None, event_id=Non acks_late=True, ) def symbolicate_event_from_reprocessing_low_priority( - cache_key, start_time=None, event_id=None, **kwargs -): + cache_key: Optional[str], + start_time: Optional[int] = None, + event_id: Optional[str] = None, +) -> None: return _do_symbolicate_event( cache_key=cache_key, start_time=start_time, From 886731545f4444057b4fff707ff55eaa4e65345f Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Fri, 1 Oct 2021 17:19:53 +0200 Subject: [PATCH 05/21] s/Event/Any --- src/sentry/tasks/store.py | 18 ++++++++---------- src/sentry/tasks/symbolication.py | 6 +++--- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/src/sentry/tasks/store.py b/src/sentry/tasks/store.py index f95dea99d951a2..00ae46f0978dea 100644 --- a/src/sentry/tasks/store.py +++ b/src/sentry/tasks/store.py @@ -85,7 +85,7 @@ def submit_save_event( cache_key: Optional[str], event_id: Optional[str], start_time: Optional[int], - data: Optional[Event], + data: Optional[Any], ) -> None: if cache_key: data = None @@ -103,7 +103,7 @@ def submit_save_event( def _do_preprocess_event( cache_key: Optional[str], - data: Optional[Event], + data: Optional[Any], start_time: Optional[int], event_id: Optional[str], process_task: Any, @@ -176,7 +176,7 @@ def _do_preprocess_event( ) def preprocess_event( cache_key: Optional[str] = None, - data: Optional[Event] = None, + data: Optional[Any] = None, start_time: Optional[int] = None, event_id: Optional[str] = None, project: Optional[Project] = None, @@ -199,7 +199,7 @@ def preprocess_event( ) def preprocess_event_from_reprocessing( cache_key: Optional[str] = None, - data: Optional[Event] = None, + data: Optional[Any] = None, start_time: Optional[int] = None, event_id: Optional[str] = None, project: Optional[Project] = None, @@ -243,7 +243,7 @@ def _do_process_event( start_time: Optional[int], event_id: Optional[str], process_task: Any, - data: Optional[Event] = None, + data: Optional[Any] = None, data_has_changed: bool = False, from_symbolicate: bool = False, ) -> None: @@ -584,7 +584,7 @@ def create_failed_event( def _do_save_event( cache_key: Optional[str] = None, - data: Optional[Event] = None, + data: Optional[Any] = None, start_time: Optional[int] = None, event_id: Optional[str] = None, project_id: Optional[int] = None, @@ -692,9 +692,7 @@ def _do_save_event( time_synthetic_monitoring_event(data, project_id, start_time) -def time_synthetic_monitoring_event( - data: Event, project_id: int, start_time: Optional[int] -) -> bool: +def time_synthetic_monitoring_event(data: Any, project_id: int, start_time: Optional[int]) -> bool: """ For special events produced by the recurring synthetic monitoring functions, emit timing metrics for: @@ -750,7 +748,7 @@ def time_synthetic_monitoring_event( ) def save_event( cache_key: Optional[str] = None, - data: Optional[Event] = None, + data: Optional[Any] = None, start_time: Optional[int] = None, event_id: Optional[str] = None, project_id: Optional[int] = None, diff --git a/src/sentry/tasks/symbolication.py b/src/sentry/tasks/symbolication.py index 246876c6b6446f..93aff92ea5962a 100644 --- a/src/sentry/tasks/symbolication.py +++ b/src/sentry/tasks/symbolication.py @@ -53,7 +53,7 @@ def submit_symbolicate( cache_key: Optional[str], event_id: Optional[str], start_time: Optional[int], - data: Optional[Event], + data: Optional[Any], ) -> None: task = symbolicate_event_from_reprocessing if from_reprocessing else symbolicate_event task.delay(cache_key=cache_key, start_time=start_time, event_id=event_id) @@ -64,7 +64,7 @@ def submit_symbolicate_low_priority( cache_key: Optional[str], event_id: Optional[str], start_time: Optional[int], - data: Optional[Event], + data: Optional[Any], ) -> None: task = ( symbolicate_event_from_reprocessing_low_priority @@ -79,7 +79,7 @@ def _do_symbolicate_event( start_time: Optional[int], event_id: Optional[str], symbolicate_task: Any, - data: Optional[Event] = None, + data: Optional[Any] = None, ) -> None: from sentry.lang.native.processing import get_symbolication_function from sentry.tasks.store import _do_process_event, process_event, process_event_from_reprocessing From 984f1de992112b0e00840fd923ac4b0717e44dbb Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Tue, 5 Oct 2021 11:10:14 +0200 Subject: [PATCH 06/21] restore project and kwargs arguments --- src/sentry/tasks/store.py | 11 ++++++++++- src/sentry/tasks/symbolication.py | 7 +++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/src/sentry/tasks/store.py b/src/sentry/tasks/store.py index 00ae46f0978dea..a5c79a9279dd6b 100644 --- a/src/sentry/tasks/store.py +++ b/src/sentry/tasks/store.py @@ -64,6 +64,7 @@ def _should_process_inner(data: CanonicalKeyDict) -> bool: def submit_process( + project: Optional[Project], from_reprocessing: bool, cache_key: Optional[str], event_id: Optional[str], @@ -147,6 +148,7 @@ def _do_preprocess_event( is_low_priority = should_demote_symbolication(project_id) task = submit_symbolicate_low_priority if is_low_priority else submit_symbolicate task( + project, from_reprocessing, cache_key, event_id, @@ -157,6 +159,7 @@ def _do_preprocess_event( if should_process(data): submit_process( + project, from_reprocessing, cache_key, event_id, @@ -180,6 +183,7 @@ def preprocess_event( start_time: Optional[int] = None, event_id: Optional[str] = None, project: Optional[Project] = None, + **kwargs: Any, ) -> None: return _do_preprocess_event( cache_key=cache_key, @@ -203,6 +207,7 @@ def preprocess_event_from_reprocessing( start_time: Optional[int] = None, event_id: Optional[str] = None, project: Optional[Project] = None, + **kwargs: Any, ) -> None: return _do_preprocess_event( cache_key=cache_key, @@ -416,6 +421,7 @@ def process_event( start_time: Optional[int] = None, event_id: Optional[str] = None, data_has_changed: bool = False, + **kwargs: Any, ) -> None: """ Handles event processing (for those events that need it) @@ -447,6 +453,7 @@ def process_event_from_reprocessing( start_time: Optional[int] = None, event_id: Optional[str] = None, data_has_changed: bool = False, + **kwargs: Any, ) -> None: return _do_process_event( cache_key=cache_key, @@ -588,6 +595,7 @@ def _do_save_event( start_time: Optional[int] = None, event_id: Optional[str] = None, project_id: Optional[int] = None, + **kwargs: Any, ) -> None: """ Saves an event to the database. @@ -752,5 +760,6 @@ def save_event( start_time: Optional[int] = None, event_id: Optional[str] = None, project_id: Optional[int] = None, + **kwargs: Any, ) -> None: - _do_save_event(cache_key, data, start_time, event_id, project_id) + _do_save_event(cache_key, data, start_time, event_id, project_id, **kwargs) diff --git a/src/sentry/tasks/symbolication.py b/src/sentry/tasks/symbolication.py index 93aff92ea5962a..d906f979e75490 100644 --- a/src/sentry/tasks/symbolication.py +++ b/src/sentry/tasks/symbolication.py @@ -9,6 +9,7 @@ from sentry import options from sentry.eventstore import processing from sentry.killswitches import killswitch_matches_context +from sentry.models import Project from sentry.processing import realtime_metrics # type: ignore from sentry.tasks.base import instrumented_task from sentry.utils import metrics @@ -49,6 +50,7 @@ def should_demote_symbolication(project_id: int) -> bool: def submit_symbolicate( + project: Optional[Project], from_reprocessing: bool, cache_key: Optional[str], event_id: Optional[str], @@ -60,6 +62,7 @@ def submit_symbolicate( def submit_symbolicate_low_priority( + project: Optional[Project], from_reprocessing: bool, cache_key: Optional[str], event_id: Optional[str], @@ -253,6 +256,7 @@ def symbolicate_event( cache_key: Optional[str], start_time: Optional[int] = None, event_id: Optional[str] = None, + **kwargs: Any, ) -> None: """ Handles event symbolication using the external service: symbolicator. @@ -280,6 +284,7 @@ def symbolicate_event_low_priority( cache_key: Optional[str], start_time: Optional[int] = None, event_id: Optional[str] = None, + **kwargs: Any, ) -> None: """ Handles event symbolication using the external service: symbolicator. @@ -310,6 +315,7 @@ def symbolicate_event_from_reprocessing( cache_key: Optional[str], start_time: Optional[int] = None, event_id: Optional[str] = None, + **kwargs: Any, ) -> None: return _do_symbolicate_event( cache_key=cache_key, @@ -330,6 +336,7 @@ def symbolicate_event_from_reprocessing_low_priority( cache_key: Optional[str], start_time: Optional[int] = None, event_id: Optional[str] = None, + **kwargs: Any, ) -> None: return _do_symbolicate_event( cache_key=cache_key, From 4d8ef4fb464963e45783fa6d3d73824e9f8a57ee Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Tue, 5 Oct 2021 16:34:18 +0200 Subject: [PATCH 07/21] Remove some Any types --- src/sentry/tasks/store.py | 42 +++++++++++++++++-------------- src/sentry/tasks/symbolication.py | 18 ++++++------- 2 files changed, 32 insertions(+), 28 deletions(-) diff --git a/src/sentry/tasks/store.py b/src/sentry/tasks/store.py index a5c79a9279dd6b..f2598a858c9fba 100644 --- a/src/sentry/tasks/store.py +++ b/src/sentry/tasks/store.py @@ -1,7 +1,7 @@ import logging from datetime import datetime from time import time -from typing import Any, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional import sentry_sdk from django.conf import settings @@ -86,7 +86,7 @@ def submit_save_event( cache_key: Optional[str], event_id: Optional[str], start_time: Optional[int], - data: Optional[Any], + data: Optional[processing.Event], ) -> None: if cache_key: data = None @@ -104,10 +104,10 @@ def submit_save_event( def _do_preprocess_event( cache_key: Optional[str], - data: Optional[Any], + data: Optional[processing.Event], start_time: Optional[int], event_id: Optional[str], - process_task: Any, + process_task: Callable[[Optional[str], Optional[int], Optional[str], bool], None], project: Optional[Project], ) -> None: from sentry.lang.native.processing import should_process_with_symbolicator @@ -179,11 +179,11 @@ def _do_preprocess_event( ) def preprocess_event( cache_key: Optional[str] = None, - data: Optional[Any] = None, + data: Optional[processing.Event] = None, start_time: Optional[int] = None, event_id: Optional[str] = None, project: Optional[Project] = None, - **kwargs: Any, + **kwargs: Dict[str, Any], ) -> None: return _do_preprocess_event( cache_key=cache_key, @@ -203,11 +203,11 @@ def preprocess_event( ) def preprocess_event_from_reprocessing( cache_key: Optional[str] = None, - data: Optional[Any] = None, + data: Optional[processing.Event] = None, start_time: Optional[int] = None, event_id: Optional[str] = None, project: Optional[Project] = None, - **kwargs: Any, + **kwargs: Dict[str, Any], ) -> None: return _do_preprocess_event( cache_key=cache_key, @@ -225,7 +225,9 @@ def preprocess_event_from_reprocessing( time_limit=(60 * 5) + 5, soft_time_limit=60 * 5, ) -def retry_process_event(process_task_name: str, task_kwargs: Any, **kwargs: Any) -> None: +def retry_process_event( + process_task_name: str, task_kwargs: Dict[str, Any], **kwargs: Dict[str, Any] +) -> None: """ The only purpose of this task is be enqueued with some ETA set. This is essentially an implementation of ETAs on top of Celery's existing ETAs, but @@ -247,8 +249,8 @@ def _do_process_event( cache_key: Optional[str], start_time: Optional[int], event_id: Optional[str], - process_task: Any, - data: Optional[Any] = None, + process_task: Callable[[Optional[str], Optional[int], Optional[str], bool], None], + data: Optional[processing.Event] = None, data_has_changed: bool = False, from_symbolicate: bool = False, ) -> None: @@ -421,7 +423,7 @@ def process_event( start_time: Optional[int] = None, event_id: Optional[str] = None, data_has_changed: bool = False, - **kwargs: Any, + **kwargs: Dict[str, Any], ) -> None: """ Handles event processing (for those events that need it) @@ -453,7 +455,7 @@ def process_event_from_reprocessing( start_time: Optional[int] = None, event_id: Optional[str] = None, data_has_changed: bool = False, - **kwargs: Any, + **kwargs: Dict[str, Any], ) -> None: return _do_process_event( cache_key=cache_key, @@ -495,7 +497,7 @@ def delete_raw_event( def create_failed_event( cache_key: Optional[str], - data: Optional[Any], + data: Optional[processing.Event], project_id: int, issues: List[Dict[str, str]], event_id: Optional[str], @@ -591,11 +593,11 @@ def create_failed_event( def _do_save_event( cache_key: Optional[str] = None, - data: Optional[Any] = None, + data: Optional[processing.Event] = None, start_time: Optional[int] = None, event_id: Optional[str] = None, project_id: Optional[int] = None, - **kwargs: Any, + **kwargs: Dict[str, Any], ) -> None: """ Saves an event to the database. @@ -700,7 +702,9 @@ def _do_save_event( time_synthetic_monitoring_event(data, project_id, start_time) -def time_synthetic_monitoring_event(data: Any, project_id: int, start_time: Optional[int]) -> bool: +def time_synthetic_monitoring_event( + data: processing.Event, project_id: int, start_time: Optional[int] +) -> bool: """ For special events produced by the recurring synthetic monitoring functions, emit timing metrics for: @@ -756,10 +760,10 @@ def time_synthetic_monitoring_event(data: Any, project_id: int, start_time: Opti ) def save_event( cache_key: Optional[str] = None, - data: Optional[Any] = None, + data: Optional[processing.Event] = None, start_time: Optional[int] = None, event_id: Optional[str] = None, project_id: Optional[int] = None, - **kwargs: Any, + **kwargs: Dict[str, Any], ) -> None: _do_save_event(cache_key, data, start_time, event_id, project_id, **kwargs) diff --git a/src/sentry/tasks/symbolication.py b/src/sentry/tasks/symbolication.py index d906f979e75490..f87a96bfc18dac 100644 --- a/src/sentry/tasks/symbolication.py +++ b/src/sentry/tasks/symbolication.py @@ -1,7 +1,7 @@ import logging import random from time import sleep, time -from typing import Any, Optional +from typing import Any, Callable, Dict, Optional import sentry_sdk from django.conf import settings @@ -55,7 +55,7 @@ def submit_symbolicate( cache_key: Optional[str], event_id: Optional[str], start_time: Optional[int], - data: Optional[Any], + data: Optional[processing.Event], ) -> None: task = symbolicate_event_from_reprocessing if from_reprocessing else symbolicate_event task.delay(cache_key=cache_key, start_time=start_time, event_id=event_id) @@ -67,7 +67,7 @@ def submit_symbolicate_low_priority( cache_key: Optional[str], event_id: Optional[str], start_time: Optional[int], - data: Optional[Any], + data: Optional[processing.Event], ) -> None: task = ( symbolicate_event_from_reprocessing_low_priority @@ -81,8 +81,8 @@ def _do_symbolicate_event( cache_key: Optional[str], start_time: Optional[int], event_id: Optional[str], - symbolicate_task: Any, - data: Optional[Any] = None, + symbolicate_task: Callable[[Optional[str], Optional[int], Optional[str]], None], + data: Optional[processing.Event] = None, ) -> None: from sentry.lang.native.processing import get_symbolication_function from sentry.tasks.store import _do_process_event, process_event, process_event_from_reprocessing @@ -256,7 +256,7 @@ def symbolicate_event( cache_key: Optional[str], start_time: Optional[int] = None, event_id: Optional[str] = None, - **kwargs: Any, + **kwargs: Dict[str, Any], ) -> None: """ Handles event symbolication using the external service: symbolicator. @@ -284,7 +284,7 @@ def symbolicate_event_low_priority( cache_key: Optional[str], start_time: Optional[int] = None, event_id: Optional[str] = None, - **kwargs: Any, + **kwargs: Dict[str, Any], ) -> None: """ Handles event symbolication using the external service: symbolicator. @@ -315,7 +315,7 @@ def symbolicate_event_from_reprocessing( cache_key: Optional[str], start_time: Optional[int] = None, event_id: Optional[str] = None, - **kwargs: Any, + **kwargs: Dict[str, Any], ) -> None: return _do_symbolicate_event( cache_key=cache_key, @@ -336,7 +336,7 @@ def symbolicate_event_from_reprocessing_low_priority( cache_key: Optional[str], start_time: Optional[int] = None, event_id: Optional[str] = None, - **kwargs: Any, + **kwargs: Dict[str, Any], ) -> None: return _do_symbolicate_event( cache_key=cache_key, From 1c5503aa757ff567e7c2f7a053190decc247961f Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Wed, 6 Oct 2021 12:21:17 +0200 Subject: [PATCH 08/21] Remove unused type ignore --- src/sentry/tasks/symbolication.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/tasks/symbolication.py b/src/sentry/tasks/symbolication.py index f87a96bfc18dac..750f43f6f005fb 100644 --- a/src/sentry/tasks/symbolication.py +++ b/src/sentry/tasks/symbolication.py @@ -10,7 +10,7 @@ from sentry.eventstore import processing from sentry.killswitches import killswitch_matches_context from sentry.models import Project -from sentry.processing import realtime_metrics # type: ignore +from sentry.processing import realtime_metrics from sentry.tasks.base import instrumented_task from sentry.utils import metrics from sentry.utils.canonical import CANONICAL_TYPES, CanonicalKeyDict From 6575788a00c912df2820ce48c81650fa97e7089f Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Wed, 6 Oct 2021 12:31:07 +0200 Subject: [PATCH 09/21] Correctly(?) import Event --- src/sentry/tasks/store.py | 19 ++++++++++--------- src/sentry/tasks/symbolication.py | 7 ++++--- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/src/sentry/tasks/store.py b/src/sentry/tasks/store.py index f2598a858c9fba..ae8b8c51987637 100644 --- a/src/sentry/tasks/store.py +++ b/src/sentry/tasks/store.py @@ -13,6 +13,7 @@ from sentry.constants import DEFAULT_STORE_NORMALIZER_ARGS from sentry.datascrubbing import scrub_data from sentry.eventstore import processing +from sentry.eventstore.processing.base import Event from sentry.killswitches import killswitch_matches_context from sentry.models import Activity, Organization, Project, ProjectOption from sentry.stacktraces.processing import process_stacktraces, should_process_for_stacktraces @@ -86,7 +87,7 @@ def submit_save_event( cache_key: Optional[str], event_id: Optional[str], start_time: Optional[int], - data: Optional[processing.Event], + data: Optional[Event], ) -> None: if cache_key: data = None @@ -104,7 +105,7 @@ def submit_save_event( def _do_preprocess_event( cache_key: Optional[str], - data: Optional[processing.Event], + data: Optional[Event], start_time: Optional[int], event_id: Optional[str], process_task: Callable[[Optional[str], Optional[int], Optional[str], bool], None], @@ -179,7 +180,7 @@ def _do_preprocess_event( ) def preprocess_event( cache_key: Optional[str] = None, - data: Optional[processing.Event] = None, + data: Optional[Event] = None, start_time: Optional[int] = None, event_id: Optional[str] = None, project: Optional[Project] = None, @@ -203,7 +204,7 @@ def preprocess_event( ) def preprocess_event_from_reprocessing( cache_key: Optional[str] = None, - data: Optional[processing.Event] = None, + data: Optional[Event] = None, start_time: Optional[int] = None, event_id: Optional[str] = None, project: Optional[Project] = None, @@ -250,7 +251,7 @@ def _do_process_event( start_time: Optional[int], event_id: Optional[str], process_task: Callable[[Optional[str], Optional[int], Optional[str], bool], None], - data: Optional[processing.Event] = None, + data: Optional[Event] = None, data_has_changed: bool = False, from_symbolicate: bool = False, ) -> None: @@ -497,7 +498,7 @@ def delete_raw_event( def create_failed_event( cache_key: Optional[str], - data: Optional[processing.Event], + data: Optional[Event], project_id: int, issues: List[Dict[str, str]], event_id: Optional[str], @@ -593,7 +594,7 @@ def create_failed_event( def _do_save_event( cache_key: Optional[str] = None, - data: Optional[processing.Event] = None, + data: Optional[Event] = None, start_time: Optional[int] = None, event_id: Optional[str] = None, project_id: Optional[int] = None, @@ -703,7 +704,7 @@ def _do_save_event( def time_synthetic_monitoring_event( - data: processing.Event, project_id: int, start_time: Optional[int] + data: Event, project_id: int, start_time: Optional[int] ) -> bool: """ For special events produced by the recurring synthetic monitoring @@ -760,7 +761,7 @@ def time_synthetic_monitoring_event( ) def save_event( cache_key: Optional[str] = None, - data: Optional[processing.Event] = None, + data: Optional[Event] = None, start_time: Optional[int] = None, event_id: Optional[str] = None, project_id: Optional[int] = None, diff --git a/src/sentry/tasks/symbolication.py b/src/sentry/tasks/symbolication.py index 750f43f6f005fb..1dc8e3926797d9 100644 --- a/src/sentry/tasks/symbolication.py +++ b/src/sentry/tasks/symbolication.py @@ -8,6 +8,7 @@ from sentry import options from sentry.eventstore import processing +from sentry.eventstore.processing.base import Event from sentry.killswitches import killswitch_matches_context from sentry.models import Project from sentry.processing import realtime_metrics @@ -55,7 +56,7 @@ def submit_symbolicate( cache_key: Optional[str], event_id: Optional[str], start_time: Optional[int], - data: Optional[processing.Event], + data: Optional[Event], ) -> None: task = symbolicate_event_from_reprocessing if from_reprocessing else symbolicate_event task.delay(cache_key=cache_key, start_time=start_time, event_id=event_id) @@ -67,7 +68,7 @@ def submit_symbolicate_low_priority( cache_key: Optional[str], event_id: Optional[str], start_time: Optional[int], - data: Optional[processing.Event], + data: Optional[Event], ) -> None: task = ( symbolicate_event_from_reprocessing_low_priority @@ -82,7 +83,7 @@ def _do_symbolicate_event( start_time: Optional[int], event_id: Optional[str], symbolicate_task: Callable[[Optional[str], Optional[int], Optional[str]], None], - data: Optional[processing.Event] = None, + data: Optional[Event] = None, ) -> None: from sentry.lang.native.processing import get_symbolication_function from sentry.tasks.store import _do_process_event, process_event, process_event_from_reprocessing From 435de3aeb19addd94663312cc6d50fa0340552c0 Mon Sep 17 00:00:00 2001 From: "getsantry[bot]" <66042841+getsantry[bot]@users.noreply.github.com> Date: Wed, 6 Oct 2021 11:42:41 +0000 Subject: [PATCH 10/21] style(lint): Auto commit lint changes --- src/sentry/tasks/store.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sentry/tasks/store.py b/src/sentry/tasks/store.py index 546c1090ffa4bc..06fd4d43f532eb 100644 --- a/src/sentry/tasks/store.py +++ b/src/sentry/tasks/store.py @@ -75,7 +75,6 @@ def submit_process( ) - def submit_save_event(project_id, from_reprocessing, cache_key, event_id, start_time, data): if cache_key: data = None From 14a708bb62eded8c7049ff19de941552d5a02a2d Mon Sep 17 00:00:00 2001 From: "getsantry[bot]" <66042841+getsantry[bot]@users.noreply.github.com> Date: Wed, 6 Oct 2021 11:48:41 +0000 Subject: [PATCH 11/21] style(lint): Auto commit lint changes --- src/sentry/tasks/symbolication.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sentry/tasks/symbolication.py b/src/sentry/tasks/symbolication.py index 098927cf4e4270..8ddd4d9eafb9cb 100644 --- a/src/sentry/tasks/symbolication.py +++ b/src/sentry/tasks/symbolication.py @@ -64,6 +64,7 @@ def should_demote_symbolication(project_id: int) -> bool: project_id ) + def submit_symbolicate( project: Optional[Project], is_low_priority: bool, From e7532660e2d345af928b433dbbf463390b811fb2 Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Wed, 6 Oct 2021 14:31:35 +0200 Subject: [PATCH 12/21] Remove unused import --- src/sentry/tasks/store.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/sentry/tasks/store.py b/src/sentry/tasks/store.py index 546c1090ffa4bc..e1195aee443662 100644 --- a/src/sentry/tasks/store.py +++ b/src/sentry/tasks/store.py @@ -75,7 +75,6 @@ def submit_process( ) - def submit_save_event(project_id, from_reprocessing, cache_key, event_id, start_time, data): if cache_key: data = None @@ -93,11 +92,7 @@ def submit_save_event(project_id, from_reprocessing, cache_key, event_id, start_ def _do_preprocess_event(cache_key, data, start_time, event_id, process_task, project): from sentry.lang.native.processing import should_process_with_symbolicator - from sentry.tasks.symbolication import ( - should_demote_symbolication, - submit_symbolicate, - submit_symbolicate_low_priority, - ) + from sentry.tasks.symbolication import should_demote_symbolication, submit_symbolicate if cache_key and data is None: data = processing.event_processing_store.get(cache_key) From b0f7f824a795cb5eda9d773c42158279f7e38df3 Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Wed, 6 Oct 2021 14:53:04 +0200 Subject: [PATCH 13/21] Fix import --- src/sentry/lang/native/symbolicator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/lang/native/symbolicator.py b/src/sentry/lang/native/symbolicator.py index 8788477c7279e4..a32029c4f08ecf 100644 --- a/src/sentry/lang/native/symbolicator.py +++ b/src/sentry/lang/native/symbolicator.py @@ -18,7 +18,7 @@ from sentry.cache import default_cache from sentry.models import Organization from sentry.net.http import Session -from sentry.tasks.store import RetrySymbolication +from sentry.tasks.symbolication import RetrySymbolication from sentry.utils import json, metrics, safe MAX_ATTEMPTS = 3 From 0190ab36fd39280965aeefabffffa7f96adacef2 Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Wed, 6 Oct 2021 15:27:35 +0200 Subject: [PATCH 14/21] Fix submit_symbolicate --- src/sentry/tasks/symbolication.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/sentry/tasks/symbolication.py b/src/sentry/tasks/symbolication.py index 8ddd4d9eafb9cb..806e8999a1e2f8 100644 --- a/src/sentry/tasks/symbolication.py +++ b/src/sentry/tasks/symbolication.py @@ -83,6 +83,8 @@ def submit_symbolicate( else: task = symbolicate_event_from_reprocessing if from_reprocessing else symbolicate_event + task.delay(cache_key=cache_key, start_time=start_time, event_id=event_id) + def _do_symbolicate_event( cache_key: Optional[str], From 6f1fd9005a86a6097bab1c3b143cf9f491b9c0c9 Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Wed, 6 Oct 2021 18:41:33 +0200 Subject: [PATCH 15/21] Correctly type kwargs --- src/sentry/tasks/store.py | 16 +++++++--------- src/sentry/tasks/symbolication.py | 10 +++++----- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/src/sentry/tasks/store.py b/src/sentry/tasks/store.py index 93da945295ef05..062b91a3aad2de 100644 --- a/src/sentry/tasks/store.py +++ b/src/sentry/tasks/store.py @@ -180,7 +180,7 @@ def preprocess_event( start_time: Optional[int] = None, event_id: Optional[str] = None, project: Optional[Project] = None, - **kwargs: Dict[str, Any], + **kwargs: Any, ) -> None: return _do_preprocess_event( cache_key=cache_key, @@ -204,7 +204,7 @@ def preprocess_event_from_reprocessing( start_time: Optional[int] = None, event_id: Optional[str] = None, project: Optional[Project] = None, - **kwargs: Dict[str, Any], + **kwargs: Any, ) -> None: return _do_preprocess_event( cache_key=cache_key, @@ -222,9 +222,7 @@ def preprocess_event_from_reprocessing( time_limit=(60 * 5) + 5, soft_time_limit=60 * 5, ) -def retry_process_event( - process_task_name: str, task_kwargs: Dict[str, Any], **kwargs: Dict[str, Any] -) -> None: +def retry_process_event(process_task_name: str, task_kwargs: Dict[str, Any], **kwargs: Any) -> None: """ The only purpose of this task is be enqueued with some ETA set. This is essentially an implementation of ETAs on top of Celery's existing ETAs, but @@ -420,7 +418,7 @@ def process_event( start_time: Optional[int] = None, event_id: Optional[str] = None, data_has_changed: bool = False, - **kwargs: Dict[str, Any], + **kwargs: Any, ) -> None: """ Handles event processing (for those events that need it) @@ -452,7 +450,7 @@ def process_event_from_reprocessing( start_time: Optional[int] = None, event_id: Optional[str] = None, data_has_changed: bool = False, - **kwargs: Dict[str, Any], + **kwargs: Any, ) -> None: return _do_process_event( cache_key=cache_key, @@ -594,7 +592,7 @@ def _do_save_event( start_time: Optional[int] = None, event_id: Optional[str] = None, project_id: Optional[int] = None, - **kwargs: Dict[str, Any], + **kwargs: Any, ) -> None: """ Saves an event to the database. @@ -761,6 +759,6 @@ def save_event( start_time: Optional[int] = None, event_id: Optional[str] = None, project_id: Optional[int] = None, - **kwargs: Dict[str, Any], + **kwargs: Any, ) -> None: _do_save_event(cache_key, data, start_time, event_id, project_id, **kwargs) diff --git a/src/sentry/tasks/symbolication.py b/src/sentry/tasks/symbolication.py index 806e8999a1e2f8..239a1c72e8c88e 100644 --- a/src/sentry/tasks/symbolication.py +++ b/src/sentry/tasks/symbolication.py @@ -1,7 +1,7 @@ import logging import random from time import sleep, time -from typing import Any, Callable, Dict, Optional +from typing import Any, Callable, Optional import sentry_sdk from django.conf import settings @@ -265,7 +265,7 @@ def symbolicate_event( cache_key: Optional[str], start_time: Optional[int] = None, event_id: Optional[str] = None, - **kwargs: Dict[str, Any], + **kwargs: Any, ) -> None: """ Handles event symbolication using the external service: symbolicator. @@ -293,7 +293,7 @@ def symbolicate_event_low_priority( cache_key: Optional[str], start_time: Optional[int] = None, event_id: Optional[str] = None, - **kwargs: Dict[str, Any], + **kwargs: Any, ) -> None: """ Handles event symbolication using the external service: symbolicator. @@ -324,7 +324,7 @@ def symbolicate_event_from_reprocessing( cache_key: Optional[str], start_time: Optional[int] = None, event_id: Optional[str] = None, - **kwargs: Dict[str, Any], + **kwargs: Any, ) -> None: return _do_symbolicate_event( cache_key=cache_key, @@ -345,7 +345,7 @@ def symbolicate_event_from_reprocessing_low_priority( cache_key: Optional[str], start_time: Optional[int] = None, event_id: Optional[str] = None, - **kwargs: Dict[str, Any], + **kwargs: Any, ) -> None: return _do_symbolicate_event( cache_key=cache_key, From a7936e227f4f23d41cb8f89a1e3427e4b7452dec Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Wed, 6 Oct 2021 18:42:10 +0200 Subject: [PATCH 16/21] Fix None issue --- src/sentry/tasks/symbolication.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sentry/tasks/symbolication.py b/src/sentry/tasks/symbolication.py index 239a1c72e8c88e..4098a5ae90786c 100644 --- a/src/sentry/tasks/symbolication.py +++ b/src/sentry/tasks/symbolication.py @@ -211,9 +211,9 @@ def _continue_to_process_event() -> None: tags={"symbolication_function": symbolication_function_name}, ) sleep_time = ( - min(e.retry_after, SYMBOLICATOR_MAX_RETRY_AFTER) - if e.retry_after - else SYMBOLICATOR_MAX_RETRY_AFTER + SYMBOLICATOR_MAX_RETRY_AFTER + if e.retry_after is None + else min(e.retry_after, SYMBOLICATOR_MAX_RETRY_AFTER) ) sleep(sleep_time) continue From 04340b22e07f3202bfdbd90823251dd0492427c9 Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Thu, 7 Oct 2021 13:43:04 +0200 Subject: [PATCH 17/21] Make some cache_keys non-optional --- src/sentry/tasks/store.py | 16 ++++++++-------- src/sentry/tasks/symbolication.py | 12 ++++++------ tests/sentry/tasks/test_store.py | 8 ++++---- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/sentry/tasks/store.py b/src/sentry/tasks/store.py index 062b91a3aad2de..84c48bda73efaf 100644 --- a/src/sentry/tasks/store.py +++ b/src/sentry/tasks/store.py @@ -67,7 +67,7 @@ def _should_process_inner(data: CanonicalKeyDict) -> bool: def submit_process( project: Optional[Project], from_reprocessing: bool, - cache_key: Optional[str], + cache_key: str, event_id: Optional[str], start_time: Optional[int], data_has_changed: bool = False, @@ -104,7 +104,7 @@ def submit_save_event( def _do_preprocess_event( - cache_key: Optional[str], + cache_key: str, data: Optional[Event], start_time: Optional[int], event_id: Optional[str], @@ -175,7 +175,7 @@ def _do_preprocess_event( soft_time_limit=60, ) def preprocess_event( - cache_key: Optional[str] = None, + cache_key: str, data: Optional[Event] = None, start_time: Optional[int] = None, event_id: Optional[str] = None, @@ -199,7 +199,7 @@ def preprocess_event( soft_time_limit=60, ) def preprocess_event_from_reprocessing( - cache_key: Optional[str] = None, + cache_key: str, data: Optional[Event] = None, start_time: Optional[int] = None, event_id: Optional[str] = None, @@ -241,7 +241,7 @@ def retry_process_event(process_task_name: str, task_kwargs: Dict[str, Any], **k def _do_process_event( - cache_key: Optional[str], + cache_key: str, start_time: Optional[int], event_id: Optional[str], process_task: Callable[[Optional[str], Optional[int], Optional[str], bool], None], @@ -414,7 +414,7 @@ def _continue_to_save_event() -> None: soft_time_limit=60, ) def process_event( - cache_key: Optional[str], + cache_key: str, start_time: Optional[int] = None, event_id: Optional[str] = None, data_has_changed: bool = False, @@ -446,7 +446,7 @@ def process_event( soft_time_limit=60, ) def process_event_from_reprocessing( - cache_key: Optional[str], + cache_key: str, start_time: Optional[int] = None, event_id: Optional[str] = None, data_has_changed: bool = False, @@ -491,7 +491,7 @@ def delete_raw_event( def create_failed_event( - cache_key: Optional[str], + cache_key: str, data: Optional[Event], project_id: int, issues: List[Dict[str, str]], diff --git a/src/sentry/tasks/symbolication.py b/src/sentry/tasks/symbolication.py index 4098a5ae90786c..8f5b695f69c8a3 100644 --- a/src/sentry/tasks/symbolication.py +++ b/src/sentry/tasks/symbolication.py @@ -69,7 +69,7 @@ def submit_symbolicate( project: Optional[Project], is_low_priority: bool, from_reprocessing: bool, - cache_key: Optional[str], + cache_key: str, event_id: Optional[str], start_time: Optional[int], data: Optional[Event], @@ -87,7 +87,7 @@ def submit_symbolicate( def _do_symbolicate_event( - cache_key: Optional[str], + cache_key: str, start_time: Optional[int], event_id: Optional[str], symbolicate_task: Callable[[Optional[str], Optional[int], Optional[str]], None], @@ -262,7 +262,7 @@ def _continue_to_process_event() -> None: acks_late=True, ) def symbolicate_event( - cache_key: Optional[str], + cache_key: str, start_time: Optional[int] = None, event_id: Optional[str] = None, **kwargs: Any, @@ -290,7 +290,7 @@ def symbolicate_event( acks_late=True, ) def symbolicate_event_low_priority( - cache_key: Optional[str], + cache_key: str, start_time: Optional[int] = None, event_id: Optional[str] = None, **kwargs: Any, @@ -321,7 +321,7 @@ def symbolicate_event_low_priority( acks_late=True, ) def symbolicate_event_from_reprocessing( - cache_key: Optional[str], + cache_key: str, start_time: Optional[int] = None, event_id: Optional[str] = None, **kwargs: Any, @@ -342,7 +342,7 @@ def symbolicate_event_from_reprocessing( acks_late=True, ) def symbolicate_event_from_reprocessing_low_priority( - cache_key: Optional[str], + cache_key: str, start_time: Optional[int] = None, event_id: Optional[str] = None, **kwargs: Any, diff --git a/tests/sentry/tasks/test_store.py b/tests/sentry/tasks/test_store.py index 20e8a29725e3ad..92774d718fcd8d 100644 --- a/tests/sentry/tasks/test_store.py +++ b/tests/sentry/tasks/test_store.py @@ -105,7 +105,7 @@ def test_move_to_process_event( "extra": {"foo": "bar"}, } - preprocess_event(data=data) + preprocess_event(cache_key="", data=data) assert mock_symbolicate_event.delay.call_count == 0 assert mock_process_event.delay.call_count == 1 @@ -125,7 +125,7 @@ def test_move_to_symbolicate_event( "extra": {"foo": "bar"}, } - preprocess_event(data=data) + preprocess_event(cache_key="", data=data) assert mock_symbolicate_event.delay.call_count == 1 assert mock_process_event.delay.call_count == 0 @@ -151,7 +151,7 @@ def test_move_to_symbolicate_event_low_priority( "extra": {"foo": "bar"}, } - preprocess_event(data=data) + preprocess_event(cache_key="", data=data) assert mock_symbolicate_event_low_priority.delay.call_count == 1 assert mock_symbolicate_event.delay.call_count == 0 @@ -216,7 +216,7 @@ def test_move_to_save_event( "extra": {"foo": "bar"}, } - preprocess_event(data=data) + preprocess_event(cache_key="", data=data) assert mock_symbolicate_event.delay.call_count == 0 assert mock_process_event.delay.call_count == 0 From 427f9489debed3a17741c45eae3f74f001fbd79f Mon Sep 17 00:00:00 2001 From: "getsantry[bot]" <66042841+getsantry[bot]@users.noreply.github.com> Date: Wed, 13 Oct 2021 11:57:51 +0000 Subject: [PATCH 18/21] style(lint): Auto commit lint changes --- src/sentry/tasks/symbolication.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/sentry/tasks/symbolication.py b/src/sentry/tasks/symbolication.py index e183bcdfacf09c..0695dd13d3d67f 100644 --- a/src/sentry/tasks/symbolication.py +++ b/src/sentry/tasks/symbolication.py @@ -303,7 +303,7 @@ def symbolicate_event( cache_key: str, start_time: Optional[int] = None, event_id: Optional[str] = None, - data: Optional[Event]=None, + data: Optional[Event] = None, queue_switches: int = 0, **kwargs: Any, ) -> None: @@ -335,7 +335,7 @@ def symbolicate_event_low_priority( cache_key: str, start_time: Optional[int] = None, event_id: Optional[str] = None, - data: Optional[Event]=None, + data: Optional[Event] = None, queue_switches: int = 0, **kwargs: Any, ) -> None: @@ -370,7 +370,7 @@ def symbolicate_event_from_reprocessing( cache_key: str, start_time: Optional[int] = None, event_id: Optional[str] = None, - data: Optional[Event]=None, + data: Optional[Event] = None, queue_switches: int = 0, **kwargs: Any, ) -> None: @@ -395,7 +395,7 @@ def symbolicate_event_from_reprocessing_low_priority( cache_key: str, start_time: Optional[int] = None, event_id: Optional[str] = None, - data: Optional[Event]=None, + data: Optional[Event] = None, queue_switches: int = 0, **kwargs: Any, ) -> None: From 329c4c3b58b2e8ee1de3b91a857c19571b60430b Mon Sep 17 00:00:00 2001 From: Betty Da Date: Fri, 15 Oct 2021 00:15:23 -0400 Subject: [PATCH 19/21] remove internal marker from do_process_event --- src/sentry/tasks/store.py | 6 +++--- src/sentry/tasks/symbolication.py | 6 +++--- tests/sentry/tasks/test_store.py | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/sentry/tasks/store.py b/src/sentry/tasks/store.py index e1195aee443662..690fe22c646ae7 100644 --- a/src/sentry/tasks/store.py +++ b/src/sentry/tasks/store.py @@ -209,7 +209,7 @@ def retry_process_event(process_task_name, task_kwargs, **kwargs): process_task.delay(**task_kwargs) -def _do_process_event( +def do_process_event( cache_key, start_time, event_id, @@ -393,7 +393,7 @@ def process_event(cache_key, start_time=None, event_id=None, data_has_changed=No :param string event_id: the event identifier :param boolean data_has_changed: set to True if the event data was changed in previous tasks """ - return _do_process_event( + return do_process_event( cache_key=cache_key, start_time=start_time, event_id=event_id, @@ -411,7 +411,7 @@ def process_event(cache_key, start_time=None, event_id=None, data_has_changed=No def process_event_from_reprocessing( cache_key, start_time=None, event_id=None, data_has_changed=None, **kwargs ): - return _do_process_event( + return do_process_event( cache_key=cache_key, start_time=start_time, event_id=event_id, diff --git a/src/sentry/tasks/symbolication.py b/src/sentry/tasks/symbolication.py index fc7f358a05c07c..3d61d02cae757c 100644 --- a/src/sentry/tasks/symbolication.py +++ b/src/sentry/tasks/symbolication.py @@ -91,7 +91,7 @@ def _do_symbolicate_event( cache_key, start_time, event_id, symbolicate_task, data=None, queue_switches=0 ): from sentry.lang.native.processing import get_symbolication_function - from sentry.tasks.store import _do_process_event, process_event, process_event_from_reprocessing + from sentry.tasks.store import do_process_event, process_event, process_event_from_reprocessing if data is None: data = processing.event_processing_store.get(cache_key) @@ -141,8 +141,8 @@ def _do_symbolicate_event( def _continue_to_process_event(): process_task = process_event_from_reprocessing if from_reprocessing else process_event - # TODO: this uses a private "store" function. is there a way to not do this? - _do_process_event( + # TODO: figure out a way to do this without directly invoking do_process_event + do_process_event( cache_key=cache_key, start_time=start_time, event_id=event_id, diff --git a/tests/sentry/tasks/test_store.py b/tests/sentry/tasks/test_store.py index ee1209cf1a6e30..1f9930385ba70d 100644 --- a/tests/sentry/tasks/test_store.py +++ b/tests/sentry/tasks/test_store.py @@ -202,7 +202,7 @@ def test_symbolicate_event_call_process_inline( mock_get_symbolication_function.return_value = lambda _: symbolicated_data - with mock.patch("sentry.tasks.store._do_process_event") as mock_do_process_event: + with mock.patch("sentry.tasks.store.do_process_event") as mock_do_process_event: symbolicate_event(cache_key="e:1", start_time=1) # The event mutated, so make sure we save it back From f186a75c688b2189a6036eccd37f17d63413ffe3 Mon Sep 17 00:00:00 2001 From: Betty Da Date: Fri, 15 Oct 2021 00:25:05 -0400 Subject: [PATCH 20/21] split tests --- tests/sentry/tasks/test_store.py | 168 ---------------- tests/sentry/tasks/test_symbolication.py | 246 +++++++++++++++++++++++ 2 files changed, 246 insertions(+), 168 deletions(-) create mode 100644 tests/sentry/tasks/test_symbolication.py diff --git a/tests/sentry/tasks/test_store.py b/tests/sentry/tasks/test_store.py index 1f9930385ba70d..e1b889544404fb 100644 --- a/tests/sentry/tasks/test_store.py +++ b/tests/sentry/tasks/test_store.py @@ -12,13 +12,6 @@ save_event, time_synthetic_monitoring_event, ) -from sentry.tasks.symbolication import ( - should_demote_symbolication, - submit_symbolicate, - symbolicate_event, -) -from sentry.testutils.helpers.options import override_options -from sentry.testutils.helpers.task_runner import TaskRunner from sentry.utils.compat import mock EVENT_ID = "cc3e6c2bb6b6498097f336d1e6979f4b" @@ -97,21 +90,6 @@ def mock_metrics_timing(): yield m -@pytest.fixture -def mock_should_demote_symbolication(): - with mock.patch( - "sentry.tasks.symbolication.should_demote_symbolication", - side_effect=[True, False, True, False, True], - ) as m: - yield m - - -@pytest.fixture -def mock_submit_symbolicate(): - with mock.patch("sentry.tasks.symbolication.submit_symbolicate", wraps=submit_symbolicate) as m: - yield m - - @pytest.mark.django_db def test_move_to_process_event( default_project, mock_process_event, mock_save_event, mock_symbolicate_event, register_plugin @@ -132,97 +110,6 @@ def test_move_to_process_event( assert mock_save_event.delay.call_count == 0 -@pytest.mark.django_db -def test_move_to_symbolicate_event( - default_project, mock_process_event, mock_save_event, mock_symbolicate_event, register_plugin -): - register_plugin(globals(), BasicPreprocessorPlugin) - data = { - "project": default_project.id, - "platform": "native", - "logentry": {"formatted": "test"}, - "event_id": EVENT_ID, - "extra": {"foo": "bar"}, - } - - preprocess_event(data=data) - - assert mock_symbolicate_event.delay.call_count == 1 - assert mock_process_event.delay.call_count == 0 - assert mock_save_event.delay.call_count == 0 - - -@pytest.mark.django_db -def test_move_to_symbolicate_event_low_priority( - default_project, - mock_process_event, - mock_save_event, - mock_symbolicate_event, - mock_symbolicate_event_low_priority, - register_plugin, -): - with override_options({"store.symbolicate-event-lpq-always": [default_project.id]}): - register_plugin(globals(), BasicPreprocessorPlugin) - data = { - "project": default_project.id, - "platform": "native", - "logentry": {"formatted": "test"}, - "event_id": EVENT_ID, - "extra": {"foo": "bar"}, - } - - preprocess_event(data=data) - - assert mock_symbolicate_event_low_priority.delay.call_count == 1 - assert mock_symbolicate_event.delay.call_count == 0 - assert mock_process_event.delay.call_count == 0 - assert mock_save_event.delay.call_count == 0 - - -@pytest.mark.django_db -def test_symbolicate_event_call_process_inline( - default_project, - mock_event_processing_store, - mock_process_event, - mock_save_event, - mock_get_symbolication_function, - register_plugin, -): - register_plugin(globals(), BasicPreprocessorPlugin) - data = { - "project": default_project.id, - "platform": "native", - "event_id": EVENT_ID, - "extra": {"foo": "bar"}, - } - mock_event_processing_store.get.return_value = data - mock_event_processing_store.store.return_value = "e:1" - - symbolicated_data = {"type": "error"} - - mock_get_symbolication_function.return_value = lambda _: symbolicated_data - - with mock.patch("sentry.tasks.store.do_process_event") as mock_do_process_event: - symbolicate_event(cache_key="e:1", start_time=1) - - # The event mutated, so make sure we save it back - ((_, (event,), _),) = mock_event_processing_store.store.mock_calls - - assert event == symbolicated_data - - assert mock_save_event.delay.call_count == 0 - assert mock_process_event.delay.call_count == 0 - mock_do_process_event.assert_called_once_with( - cache_key="e:1", - start_time=1, - event_id=EVENT_ID, - process_task=mock_process_event, - data=symbolicated_data, - data_has_changed=True, - from_symbolicate=True, - ) - - @pytest.mark.django_db def test_move_to_save_event( default_project, mock_process_event, mock_save_event, mock_symbolicate_event, register_plugin @@ -463,58 +350,3 @@ def test_time_synthetic_monitoring_event_in_save_event(mock_metrics_timing): mock.ANY, ) assert to_process.kwargs == {"tags": tags, "sample_rate": 1.0} - - -@pytest.mark.django_db -def test_should_demote_symbolication_empty(default_project): - assert not should_demote_symbolication(default_project.id) - - -@pytest.mark.django_db -def test_should_demote_symbolication_always(default_project): - with override_options({"store.symbolicate-event-lpq-always": [default_project.id]}): - assert should_demote_symbolication(default_project.id) - - -@pytest.mark.django_db -def test_should_demote_symbolication_never(default_project): - with override_options({"store.symbolicate-event-lpq-never": [default_project.id]}): - assert not should_demote_symbolication(default_project.id) - - -@pytest.mark.django_db -def test_should_demote_symbolication_always_and_never(default_project): - with override_options( - { - "store.symbolicate-event-lpq-never": [default_project.id], - "store.symbolicate-event-lpq-always": [default_project.id], - } - ): - assert not should_demote_symbolication(default_project.id) - - -@pytest.mark.django_db -def test_submit_symbolicate_queue_switch( - default_project, mock_should_demote_symbolication, mock_submit_symbolicate -): - data = { - "project": default_project.id, - "platform": "native", - "logentry": {"formatted": "test"}, - "event_id": EVENT_ID, - "extra": {"foo": "bar"}, - } - - is_low_priority = mock_should_demote_symbolication(default_project.id) - assert is_low_priority - - with TaskRunner(): - mock_submit_symbolicate( - is_low_priority=is_low_priority, - from_reprocessing=False, - cache_key="e:1", - event_id=EVENT_ID, - start_time=0, - data=data, - ) - assert mock_submit_symbolicate.call_count == 4 diff --git a/tests/sentry/tasks/test_symbolication.py b/tests/sentry/tasks/test_symbolication.py new file mode 100644 index 00000000000000..5568c2c930693c --- /dev/null +++ b/tests/sentry/tasks/test_symbolication.py @@ -0,0 +1,246 @@ +import pytest + +from sentry.plugins.base.v2 import Plugin2 +from sentry.tasks.store import preprocess_event +from sentry.tasks.symbolication import ( + should_demote_symbolication, + submit_symbolicate, + symbolicate_event, +) +from sentry.testutils.helpers.options import override_options +from sentry.testutils.helpers.task_runner import TaskRunner +from sentry.utils.compat import mock + +EVENT_ID = "cc3e6c2bb6b6498097f336d1e6979f4b" + + +class BasicPreprocessorPlugin(Plugin2): + def get_event_preprocessors(self, data): + def remove_extra(data): + del data["extra"] + return data + + def put_on_hold(data): + data["unprocessed"] = True + return data + + if data.get("platform") == "mattlang": + return [remove_extra, lambda x: None] + + if data.get("platform") == "noop": + return [lambda data: None] + + if data.get("platform") == "holdmeclose": + return [put_on_hold] + + return [] + + def is_enabled(self, project=None): + return True + + +@pytest.fixture +def mock_save_event(): + with mock.patch("sentry.tasks.store.save_event") as m: + yield m + + +@pytest.fixture +def mock_process_event(): + with mock.patch("sentry.tasks.store.process_event") as m: + yield m + + +@pytest.fixture +def mock_symbolicate_event(): + with mock.patch("sentry.tasks.symbolication.symbolicate_event") as m: + yield m + + +@pytest.fixture +def mock_symbolicate_event_low_priority(): + with mock.patch("sentry.tasks.symbolication.symbolicate_event_low_priority") as m: + yield m + + +@pytest.fixture +def mock_get_symbolication_function(): + with mock.patch("sentry.lang.native.processing.get_symbolication_function") as m: + yield m + + +@pytest.fixture +def mock_event_processing_store(): + with mock.patch("sentry.eventstore.processing.event_processing_store") as m: + yield m + + +@pytest.fixture +def mock_should_demote_symbolication(): + with mock.patch( + "sentry.tasks.symbolication.should_demote_symbolication", + side_effect=[True, False, True, False, True], + ) as m: + yield m + + +@pytest.fixture +def mock_submit_symbolicate(): + with mock.patch("sentry.tasks.symbolication.submit_symbolicate", wraps=submit_symbolicate) as m: + yield m + + +@pytest.mark.django_db +def test_move_to_symbolicate_event( + default_project, mock_process_event, mock_save_event, mock_symbolicate_event, register_plugin +): + register_plugin(globals(), BasicPreprocessorPlugin) + data = { + "project": default_project.id, + "platform": "native", + "logentry": {"formatted": "test"}, + "event_id": EVENT_ID, + "extra": {"foo": "bar"}, + } + + preprocess_event(data=data) + + assert mock_symbolicate_event.delay.call_count == 1 + assert mock_process_event.delay.call_count == 0 + assert mock_save_event.delay.call_count == 0 + + +@pytest.mark.django_db +def test_move_to_symbolicate_event_low_priority( + default_project, + mock_process_event, + mock_save_event, + mock_symbolicate_event, + mock_symbolicate_event_low_priority, + register_plugin, +): + with override_options({"store.symbolicate-event-lpq-always": [default_project.id]}): + register_plugin(globals(), BasicPreprocessorPlugin) + data = { + "project": default_project.id, + "platform": "native", + "logentry": {"formatted": "test"}, + "event_id": EVENT_ID, + "extra": {"foo": "bar"}, + } + + preprocess_event(data=data) + + assert mock_symbolicate_event_low_priority.delay.call_count == 1 + assert mock_symbolicate_event.delay.call_count == 0 + assert mock_process_event.delay.call_count == 0 + assert mock_save_event.delay.call_count == 0 + + +@pytest.mark.django_db +def test_symbolicate_event_call_process_inline( + default_project, + mock_event_processing_store, + mock_process_event, + mock_save_event, + mock_get_symbolication_function, + register_plugin, +): + register_plugin(globals(), BasicPreprocessorPlugin) + data = { + "project": default_project.id, + "platform": "native", + "event_id": EVENT_ID, + "extra": {"foo": "bar"}, + } + mock_event_processing_store.get.return_value = data + mock_event_processing_store.store.return_value = "e:1" + + symbolicated_data = {"type": "error"} + + mock_get_symbolication_function.return_value = lambda _: symbolicated_data + + with mock.patch("sentry.tasks.store.do_process_event") as mock_do_process_event: + symbolicate_event(cache_key="e:1", start_time=1) + + # The event mutated, so make sure we save it back + ((_, (event,), _),) = mock_event_processing_store.store.mock_calls + + assert event == symbolicated_data + + assert mock_save_event.delay.call_count == 0 + assert mock_process_event.delay.call_count == 0 + mock_do_process_event.assert_called_once_with( + cache_key="e:1", + start_time=1, + event_id=EVENT_ID, + process_task=mock_process_event, + data=symbolicated_data, + data_has_changed=True, + from_symbolicate=True, + ) + + +@pytest.fixture(params=["org", "project"]) +def options_model(request, default_organization, default_project): + if request.param == "org": + return default_organization + elif request.param == "project": + return default_project + else: + raise ValueError(request.param) + + +@pytest.mark.django_db +def test_should_demote_symbolication_empty(default_project): + assert not should_demote_symbolication(default_project.id) + + +@pytest.mark.django_db +def test_should_demote_symbolication_always(default_project): + with override_options({"store.symbolicate-event-lpq-always": [default_project.id]}): + assert should_demote_symbolication(default_project.id) + + +@pytest.mark.django_db +def test_should_demote_symbolication_never(default_project): + with override_options({"store.symbolicate-event-lpq-never": [default_project.id]}): + assert not should_demote_symbolication(default_project.id) + + +@pytest.mark.django_db +def test_should_demote_symbolication_always_and_never(default_project): + with override_options( + { + "store.symbolicate-event-lpq-never": [default_project.id], + "store.symbolicate-event-lpq-always": [default_project.id], + } + ): + assert not should_demote_symbolication(default_project.id) + + +@pytest.mark.django_db +def test_submit_symbolicate_queue_switch( + default_project, mock_should_demote_symbolication, mock_submit_symbolicate +): + data = { + "project": default_project.id, + "platform": "native", + "logentry": {"formatted": "test"}, + "event_id": EVENT_ID, + "extra": {"foo": "bar"}, + } + + is_low_priority = mock_should_demote_symbolication(default_project.id) + assert is_low_priority + + with TaskRunner(): + mock_submit_symbolicate( + is_low_priority=is_low_priority, + from_reprocessing=False, + cache_key="e:1", + event_id=EVENT_ID, + start_time=0, + data=data, + ) + assert mock_submit_symbolicate.call_count == 4 From 18299f5a74cf34ce832f249d1b352a8c5cef19c7 Mon Sep 17 00:00:00 2001 From: Sebastian Zivota Date: Fri, 15 Oct 2021 15:08:11 +0200 Subject: [PATCH 21/21] fix test --- src/sentry/tasks/symbolication.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/sentry/tasks/symbolication.py b/src/sentry/tasks/symbolication.py index c01e1cc95bb717..abb60ad766ec02 100644 --- a/src/sentry/tasks/symbolication.py +++ b/src/sentry/tasks/symbolication.py @@ -6,11 +6,11 @@ from django.conf import settings from sentry import options -from sentry.eventstore.processing import event_processing_store +from sentry.eventstore import processing from sentry.killswitches import killswitch_matches_context from sentry.processing import realtime_metrics +from sentry.tasks import store from sentry.tasks.base import instrumented_task -from sentry.tasks.store import do_process_event, process_event, process_event_from_reprocessing from sentry.utils import metrics from sentry.utils.canonical import CANONICAL_TYPES, CanonicalKeyDict from sentry.utils.sdk import set_current_event_project @@ -94,7 +94,7 @@ def _do_symbolicate_event( from sentry.lang.native.processing import get_symbolication_function if data is None: - data = event_processing_store.get(cache_key) + data = processing.event_processing_store.get(cache_key) if data is None: metrics.incr( @@ -144,8 +144,10 @@ def _do_symbolicate_event( return def _continue_to_process_event(): - process_task = process_event_from_reprocessing if from_reprocessing else process_event - do_process_event( + process_task = ( + store.process_event_from_reprocessing if from_reprocessing else store.process_event + ) + store.do_process_event( cache_key=cache_key, start_time=start_time, event_id=event_id, @@ -273,7 +275,7 @@ def _continue_to_process_event(): data = dict(data.items()) if has_changed: - cache_key = event_processing_store.store(data) + cache_key = processing.event_processing_store.store(data) return _continue_to_process_event()