Skip to content

Commit

Permalink
feat(crons): Implement fallback clock pulse task
Browse files Browse the repository at this point in the history
  • Loading branch information
evanpurkhiser committed Aug 15, 2023
1 parent 0b9b656 commit c66b615
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 6 deletions.
6 changes: 6 additions & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,12 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
"schedule": timedelta(seconds=30),
"options": {"expires": 30},
},
"monitors-clock-pulse": {
"task": "sentry.monitors.tasks.clock_pulse",
# Run every 1 minute
"schedule": crontab(minute="*/1"),
"options": {"expires": 60},
},
"monitors-temp-task-dispatcher": {
"task": "sentry.monitors.tasks.temp_task_dispatcher",
# Run every 1 minute
Expand Down
41 changes: 41 additions & 0 deletions src/sentry/monitors/tasks.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
import logging
from datetime import datetime

import msgpack
import sentry_sdk
from arroyo import Topic
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from django.conf import settings
from django.utils import timezone

from sentry import options
from sentry.constants import ObjectStatus
from sentry.monitors.types import ClockPulseMessage
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.utils import metrics, redis
from sentry.utils.arroyo_producer import SingletonProducer
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition

from .models import (
CheckInStatus,
Expand Down Expand Up @@ -48,6 +54,17 @@
MONITOR_TASKS_LAST_TRIGGERED_KEY = "sentry.monitors.last_tasks_ts"


def _get_monitor_checkin_producer() -> KafkaProducer:
cluster_name = get_topic_definition(settings.KAFKA_INGEST_MONITORS)["cluster"]
producer_config = get_kafka_producer_cluster_options(cluster_name)
producer_config.pop("compression.type", None)
producer_config.pop("message.max.bytes", None)
return KafkaProducer(build_kafka_configuration(default_config=producer_config))


_checkin_producer = SingletonProducer(_get_monitor_checkin_producer)


def _dispatch_tasks(ts: datetime):
"""
Dispatch monitor tasks triggered by the consumer clock.
Expand Down Expand Up @@ -129,6 +146,30 @@ def try_monitor_tasks_trigger(ts: datetime):
_dispatch_tasks(ts)


@instrumented_task(name="sentry.monitors.tasks.clock_pulse", silo_mode=SiloMode.REGION)
def clock_pulse(current_datetime=None):
"""
This task is run once a minute when to produce a 'clock pulse' into the
monitor ingest topic. This is to ensure there is always a message in the
topic that can drive the clock which dispatches the monitor tasks.
"""
if current_datetime is None:
current_datetime = timezone.now()

if settings.SENTRY_EVENTSTREAM != "sentry.eventstream.kafka.KafkaEventStream":
# Directly trigger try_monitor_tasks_trigger in dev
try_monitor_tasks_trigger(current_datetime)
return

message: ClockPulseMessage = {
"message_type": "clock_pulse",
}

# Produce the pulse into the topic
payload = KafkaPayload(None, msgpack.packb(message), [])
_checkin_producer.produce(Topic(settings.KAFKA_INGEST_MONITORS), payload)


@instrumented_task(name="sentry.monitors.tasks.temp_task_dispatcher", silo_mode=SiloMode.REGION)
def temp_task_dispatcher():
"""
Expand Down
36 changes: 30 additions & 6 deletions tests/sentry/monitors/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from datetime import datetime, timedelta
from unittest.mock import patch
from unittest import mock

import msgpack
from arroyo.backends.kafka import KafkaPayload
from django.test import override_settings
from django.utils import timezone

from sentry.constants import ObjectStatus
Expand All @@ -13,7 +16,12 @@
MonitorType,
ScheduleType,
)
from sentry.monitors.tasks import check_missing, check_timeout, try_monitor_tasks_trigger
from sentry.monitors.tasks import (
check_missing,
check_timeout,
clock_pulse,
try_monitor_tasks_trigger,
)
from sentry.testutils.cases import TestCase


Expand Down Expand Up @@ -405,7 +413,7 @@ def test_timeout_via_max_runtime_configuration(self):
id=monitor_environment.id, status=MonitorStatus.TIMEOUT
).exists()

@patch("sentry.monitors.tasks.logger")
@mock.patch("sentry.monitors.tasks.logger")
def test_missed_exception_handling(self, logger):
org = self.create_organization()
project = self.create_project(organization=org)
Expand Down Expand Up @@ -458,7 +466,7 @@ def test_missed_exception_handling(self, logger):
monitor_environment=monitor_environment.id, status=CheckInStatus.MISSED
).exists()

@patch("sentry.monitors.tasks.logger")
@mock.patch("sentry.monitors.tasks.logger")
def test_timeout_exception_handling(self, logger):
org = self.create_organization()
project = self.create_project(organization=org)
Expand Down Expand Up @@ -548,7 +556,7 @@ def test_timeout_exception_handling(self, logger):
id=monitor_environment.id, status=MonitorStatus.TIMEOUT
).exists()

@patch("sentry.monitors.tasks._dispatch_tasks")
@mock.patch("sentry.monitors.tasks._dispatch_tasks")
def test_monitor_task_trigger(self, dispatch_tasks):
now = datetime.now().replace(second=0, microsecond=0)

Expand All @@ -575,7 +583,7 @@ def test_monitor_task_trigger(self, dispatch_tasks):
assert dispatch_tasks.call_count == 3
capture_message.assert_called_with("Monitor task dispatch minute skipped")

@patch("sentry.monitors.tasks._dispatch_tasks")
@mock.patch("sentry.monitors.tasks._dispatch_tasks")
def test_monitor_task_trigger_partition_desync(self, dispatch_tasks):
"""
When consumer partitions are not completely synchronized we may read
Expand All @@ -602,3 +610,19 @@ def test_monitor_task_trigger_partition_desync(self, dispatch_tasks):
# Fourth message moves past a new minute boundary, tick
try_monitor_tasks_trigger(ts=now + timedelta(minutes=1, seconds=1))
assert dispatch_tasks.call_count == 2

@override_settings(KAFKA_INGEST_MONITORS="monitors-test-topic")
@override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
@mock.patch("sentry.monitors.tasks._checkin_producer")
def test_clock_pulse(self, _checkin_producer):
clock_pulse()

assert _checkin_producer.produce.call_count == 1
assert _checkin_producer.produce.mock_calls[0] == mock.call(
mock.ANY,
KafkaPayload(
None,
msgpack.packb({"message_type": "clock_pulse"}),
[],
),
)

0 comments on commit c66b615

Please sign in to comment.