From 28b35043b55219ee284d4b70175523981d63595e Mon Sep 17 00:00:00 2001 From: Kyle Wigley <9877221+kwigley@users.noreply.github.com> Date: Mon, 15 Jan 2024 17:55:50 -0500 Subject: [PATCH] feat(integrations): Add support for celery-redbeat cron tasks --- sentry_sdk/integrations/celery.py | 62 +++++++++++++++++++ setup.py | 1 + .../celery/test_celery_beat_crons.py | 54 ++++++++++++++++ 3 files changed, 117 insertions(+) diff --git a/sentry_sdk/integrations/celery.py b/sentry_sdk/integrations/celery.py index 0fd983de8d..f2e1aff48a 100644 --- a/sentry_sdk/integrations/celery.py +++ b/sentry_sdk/integrations/celery.py @@ -56,6 +56,11 @@ except ImportError: raise DidNotEnable("Celery not installed") +try: + from redbeat.schedulers import RedBeatScheduler # type: ignore +except ImportError: + RedBeatScheduler = None + CELERY_CONTROL_FLOW_EXCEPTIONS = (Retry, Ignore, Reject) @@ -76,6 +81,7 @@ def __init__( if monitor_beat_tasks: _patch_beat_apply_entry() + _patch_redbeat_maybe_due() _setup_celery_beat_signals() @staticmethod @@ -535,6 +541,62 @@ def sentry_apply_entry(*args, **kwargs): Scheduler.apply_entry = sentry_apply_entry +def _patch_redbeat_maybe_due(): + # type: () -> None + + if RedBeatScheduler is None: + return + + original_maybe_due = RedBeatScheduler.maybe_due + + def sentry_maybe_due(*args, **kwargs): + # type: (*Any, **Any) -> None + scheduler, schedule_entry = args + app = scheduler.app + + celery_schedule = schedule_entry.schedule + monitor_name = schedule_entry.name + + hub = Hub.current + integration = hub.get_integration(CeleryIntegration) + if integration is None: + return original_maybe_due(*args, **kwargs) + + if match_regex_list(monitor_name, integration.exclude_beat_tasks): + return original_maybe_due(*args, **kwargs) + + with hub.configure_scope() as scope: + # When tasks are started from Celery Beat, make sure each task has its own trace. + scope.set_new_propagation_context() + + monitor_config = _get_monitor_config(celery_schedule, app, monitor_name) + + is_supported_schedule = bool(monitor_config) + if is_supported_schedule: + headers = schedule_entry.options.pop("headers", {}) + headers.update( + { + "sentry-monitor-slug": monitor_name, + "sentry-monitor-config": monitor_config, + } + ) + + check_in_id = capture_checkin( + monitor_slug=monitor_name, + monitor_config=monitor_config, + status=MonitorStatus.IN_PROGRESS, + ) + headers.update({"sentry-monitor-check-in-id": check_in_id}) + + # Set the Sentry configuration in the options of the ScheduleEntry. + # Those will be picked up in `apply_async` and added to the headers. + schedule_entry.options["headers"] = headers + + return original_maybe_due(*args, **kwargs) + + RedBeatScheduler.maybe_due = sentry_maybe_due + + def _setup_celery_beat_signals(): # type: () -> None task_success.connect(crons_task_success) diff --git a/setup.py b/setup.py index bbaa98bbd2..000af9076c 100644 --- a/setup.py +++ b/setup.py @@ -50,6 +50,7 @@ def get_file_text(file_name): "beam": ["apache-beam>=2.12"], "bottle": ["bottle>=0.12.13"], "celery": ["celery>=3"], + "redbeat": ["celery-redbeat>=2"], "chalice": ["chalice>=1.16.0"], "clickhouse-driver": ["clickhouse-driver>=0.2.0"], "django": ["django>=1.8"], diff --git a/tests/integrations/celery/test_celery_beat_crons.py b/tests/integrations/celery/test_celery_beat_crons.py index 9343b3c926..9ffa59b00d 100644 --- a/tests/integrations/celery/test_celery_beat_crons.py +++ b/tests/integrations/celery/test_celery_beat_crons.py @@ -8,6 +8,7 @@ _get_humanized_interval, _get_monitor_config, _patch_beat_apply_entry, + _patch_redbeat_maybe_due, crons_task_success, crons_task_failure, crons_task_retry, @@ -447,3 +448,56 @@ def test_exclude_beat_tasks_option( # The original Scheduler.apply_entry() is called, AND _get_monitor_config is called. assert fake_apply_entry.call_count == 1 assert _get_monitor_config.call_count == 1 + + +@pytest.mark.parametrize( + "task_name,exclude_beat_tasks,task_in_excluded_beat_tasks", + [ + ["some_task_name", ["xxx", "some_task.*"], True], + ["some_task_name", ["xxx", "some_other_task.*"], False], + ], +) +def test_exclude_redbeat_tasks_option( + task_name, exclude_beat_tasks, task_in_excluded_beat_tasks +): + """ + Test excluding Celery RedBeat tasks from automatic instrumentation. + """ + fake_maybe_due = MagicMock() + + fake_redbeat_scheduler = MagicMock() + fake_redbeat_scheduler.maybe_due = fake_maybe_due + + fake_integration = MagicMock() + fake_integration.exclude_beat_tasks = exclude_beat_tasks + + fake_schedule_entry = MagicMock() + fake_schedule_entry.name = task_name + + fake_get_monitor_config = MagicMock() + + with mock.patch( + "sentry_sdk.integrations.celery.RedBeatScheduler", fake_redbeat_scheduler + ) as RedBeatScheduler: # noqa: N806 + with mock.patch( + "sentry_sdk.integrations.celery.Hub.current.get_integration", + return_value=fake_integration, + ): + with mock.patch( + "sentry_sdk.integrations.celery._get_monitor_config", + fake_get_monitor_config, + ) as _get_monitor_config: + # Mimic CeleryIntegration patching of RedBeatScheduler.maybe_due() + _patch_redbeat_maybe_due() + # Mimic Celery RedBeat calling a task from the RedBeat schedule + RedBeatScheduler.maybe_due(fake_redbeat_scheduler, fake_schedule_entry) + + if task_in_excluded_beat_tasks: + # Only the original RedBeatScheduler.maybe_due() is called, _get_monitor_config is NOT called. + assert fake_maybe_due.call_count == 1 + _get_monitor_config.assert_not_called() + + else: + # The original RedBeatScheduler.maybe_due() is called, AND _get_monitor_config is called. + assert fake_maybe_due.call_count == 1 + assert _get_monitor_config.call_count == 1