Skip to content

Commit

Permalink
feat(integrations): Add support for celery-redbeat cron tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
kwigley committed Jan 16, 2024
1 parent 2f05ccb commit bdaa390
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 0 deletions.
62 changes: 62 additions & 0 deletions sentry_sdk/integrations/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@
except ImportError:
raise DidNotEnable("Celery not installed")

try:
from redbeat.schedulers import RedBeatScheduler # type: ignore
except ImportError:
RedBeatScheduler = None


CELERY_CONTROL_FLOW_EXCEPTIONS = (Retry, Ignore, Reject)

Expand All @@ -76,6 +81,7 @@ def __init__(

if monitor_beat_tasks:
_patch_beat_apply_entry()
_patch_redbeat_maybe_due()
_setup_celery_beat_signals()

@staticmethod
Expand Down Expand Up @@ -535,6 +541,62 @@ def sentry_apply_entry(*args, **kwargs):
Scheduler.apply_entry = sentry_apply_entry


def _patch_redbeat_maybe_due():
# type: () -> None

if RedBeatScheduler is None:
return

original_maybe_due = RedBeatScheduler.maybe_due

def sentry_maybe_due(*args, **kwargs):
# type: (*Any, **Any) -> None
scheduler, schedule_entry = args
app = scheduler.app

celery_schedule = schedule_entry.schedule
monitor_name = schedule_entry.name

hub = Hub.current
integration = hub.get_integration(CeleryIntegration)
if integration is None:
return original_maybe_due(*args, **kwargs)

if match_regex_list(monitor_name, integration.exclude_beat_tasks):
return original_maybe_due(*args, **kwargs)

with hub.configure_scope() as scope:
# When tasks are started from Celery Beat, make sure each task has its own trace.
scope.set_new_propagation_context()

monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)

is_supported_schedule = bool(monitor_config)
if is_supported_schedule:
headers = schedule_entry.options.pop("headers", {})
headers.update(
{
"sentry-monitor-slug": monitor_name,
"sentry-monitor-config": monitor_config,
}
)

check_in_id = capture_checkin(
monitor_slug=monitor_name,
monitor_config=monitor_config,
status=MonitorStatus.IN_PROGRESS,
)
headers.update({"sentry-monitor-check-in-id": check_in_id})

# Set the Sentry configuration in the options of the ScheduleEntry.
# Those will be picked up in `apply_async` and added to the headers.
schedule_entry.options["headers"] = headers

return original_maybe_due(*args, **kwargs)

RedBeatScheduler.maybe_due = sentry_maybe_due


def _setup_celery_beat_signals():
# type: () -> None
task_success.connect(crons_task_success)
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def get_file_text(file_name):
"beam": ["apache-beam>=2.12"],
"bottle": ["bottle>=0.12.13"],
"celery": ["celery>=3"],
"redbeat": ["celery-redbeat>=2"],
"chalice": ["chalice>=1.16.0"],
"clickhouse-driver": ["clickhouse-driver>=0.2.0"],
"django": ["django>=1.8"],
Expand Down
54 changes: 54 additions & 0 deletions tests/integrations/celery/test_celery_beat_crons.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
_get_humanized_interval,
_get_monitor_config,
_patch_beat_apply_entry,
_patch_redbeat_maybe_due,
crons_task_success,
crons_task_failure,
crons_task_retry,
Expand Down Expand Up @@ -447,3 +448,56 @@ def test_exclude_beat_tasks_option(
# The original Scheduler.apply_entry() is called, AND _get_monitor_config is called.
assert fake_apply_entry.call_count == 1
assert _get_monitor_config.call_count == 1


@pytest.mark.parametrize(
"task_name,exclude_beat_tasks,task_in_excluded_beat_tasks",
[
["some_task_name", ["xxx", "some_task.*"], True],
["some_task_name", ["xxx", "some_other_task.*"], False],
],
)
def test_exclude_redbeat_tasks_option(
task_name, exclude_beat_tasks, task_in_excluded_beat_tasks
):
"""
Test excluding Celery RedBeat tasks from automatic instrumentation.
"""
fake_maybe_due = MagicMock()

fake_redbeat_scheduler = MagicMock()
fake_redbeat_scheduler.maybe_due = fake_maybe_due

fake_integration = MagicMock()
fake_integration.exclude_beat_tasks = exclude_beat_tasks

fake_schedule_entry = MagicMock()
fake_schedule_entry.name = task_name

fake_get_monitor_config = MagicMock()

with mock.patch(
"sentry_sdk.integrations.celery.RedBeatScheduler", fake_redbeat_scheduler
) as RedBeatScheduler: # noqa: N806
with mock.patch(
"sentry_sdk.integrations.celery.Hub.current.get_integration",
return_value=fake_integration,
):
with mock.patch(
"sentry_sdk.integrations.celery._get_monitor_config",
fake_get_monitor_config,
) as _get_monitor_config:
# Mimic CeleryIntegration patching of RedBeatScheduler.maybe_due()
_patch_redbeat_maybe_due()
# Mimic Celery RedBeat calling a task from the RedBeat schedule
RedBeatScheduler.maybe_due(fake_redbeat_scheduler, fake_schedule_entry)

if task_in_excluded_beat_tasks:
# Only the original RedBeatScheduler.maybe_due() is called, _get_monitor_config is NOT called.
assert fake_maybe_due.call_count == 1
_get_monitor_config.assert_not_called()

else:
# The original RedBeatScheduler.maybe_due() is called, AND _get_monitor_config is called.
assert fake_maybe_due.call_count == 1
assert _get_monitor_config.call_count == 1

0 comments on commit bdaa390

Please sign in to comment.