Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(crons): Implement fallback clock pulse task #54647

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,12 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
"schedule": timedelta(seconds=30),
"options": {"expires": 30},
},
"monitors-clock-pulse": {
"task": "sentry.monitors.tasks.clock_pulse",
# Run every 1 minute
"schedule": crontab(minute="*/1"),
"options": {"expires": 60},
},
"monitors-temp-task-dispatcher": {
"task": "sentry.monitors.tasks.temp_task_dispatcher",
# Run every 1 minute
Expand Down
41 changes: 41 additions & 0 deletions src/sentry/monitors/tasks.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
import logging
from datetime import datetime

import msgpack
import sentry_sdk
from arroyo import Topic
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from django.conf import settings
from django.utils import timezone

from sentry import options
from sentry.constants import ObjectStatus
from sentry.monitors.types import ClockPulseMessage
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.utils import metrics, redis
from sentry.utils.arroyo_producer import SingletonProducer
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition

from .models import (
CheckInStatus,
Expand Down Expand Up @@ -48,6 +54,17 @@
MONITOR_TASKS_LAST_TRIGGERED_KEY = "sentry.monitors.last_tasks_ts"


def _get_monitor_checkin_producer() -> KafkaProducer:
cluster_name = get_topic_definition(settings.KAFKA_INGEST_MONITORS)["cluster"]
producer_config = get_kafka_producer_cluster_options(cluster_name)
producer_config.pop("compression.type", None)
producer_config.pop("message.max.bytes", None)
return KafkaProducer(build_kafka_configuration(default_config=producer_config))


_checkin_producer = SingletonProducer(_get_monitor_checkin_producer)


def _dispatch_tasks(ts: datetime):
"""
Dispatch monitor tasks triggered by the consumer clock.
Expand Down Expand Up @@ -129,6 +146,30 @@ def try_monitor_tasks_trigger(ts: datetime):
_dispatch_tasks(ts)


@instrumented_task(name="sentry.monitors.tasks.clock_pulse", silo_mode=SiloMode.REGION)
def clock_pulse(current_datetime=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this run conditionally based on a setting?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, as per fillipos comment #53661 (comment), we decided to just always have it run. Then we don't have a "high" or "low" volume mode. it is just adaptive.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is running all the time then doesn't it break the case where there's an ingest backlog?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No because it is still putting the message into the kafka topic, thus into the queue.

This would be problematic if we turned off producers (relay)

"""
This task is run once a minute when to produce a 'clock pulse' into the
monitor ingest topic. This is to ensure there is always a message in the
topic that can drive the clock which dispatches the monitor tasks.
"""
if current_datetime is None:
current_datetime = timezone.now()

if settings.SENTRY_EVENTSTREAM != "sentry.eventstream.kafka.KafkaEventStream":
# Directly trigger try_monitor_tasks_trigger in dev
try_monitor_tasks_trigger(current_datetime)
return

message: ClockPulseMessage = {
"message_type": "clock_pulse",
}

# Produce the pulse into the topic
payload = KafkaPayload(None, msgpack.packb(message), [])
_checkin_producer.produce(Topic(settings.KAFKA_INGEST_MONITORS), payload)


@instrumented_task(name="sentry.monitors.tasks.temp_task_dispatcher", silo_mode=SiloMode.REGION)
def temp_task_dispatcher():
"""
Expand Down
38 changes: 31 additions & 7 deletions tests/sentry/monitors/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from datetime import datetime, timedelta
from unittest.mock import patch
from unittest import mock

import msgpack
from arroyo.backends.kafka import KafkaPayload
from django.test import override_settings
from django.utils import timezone

from sentry.constants import ObjectStatus
Expand All @@ -13,7 +16,12 @@
MonitorType,
ScheduleType,
)
from sentry.monitors.tasks import check_missing, check_timeout, try_monitor_tasks_trigger
from sentry.monitors.tasks import (
check_missing,
check_timeout,
clock_pulse,
try_monitor_tasks_trigger,
)
from sentry.testutils.cases import TestCase


Expand Down Expand Up @@ -405,7 +413,7 @@ def test_timeout_via_max_runtime_configuration(self):
id=monitor_environment.id, status=MonitorStatus.TIMEOUT
).exists()

@patch("sentry.monitors.tasks.logger")
@mock.patch("sentry.monitors.tasks.logger")
def test_missed_exception_handling(self, logger):
org = self.create_organization()
project = self.create_project(organization=org)
Expand Down Expand Up @@ -458,7 +466,7 @@ def test_missed_exception_handling(self, logger):
monitor_environment=monitor_environment.id, status=CheckInStatus.MISSED
).exists()

@patch("sentry.monitors.tasks.logger")
@mock.patch("sentry.monitors.tasks.logger")
def test_timeout_exception_handling(self, logger):
org = self.create_organization()
project = self.create_project(organization=org)
Expand Down Expand Up @@ -548,7 +556,7 @@ def test_timeout_exception_handling(self, logger):
id=monitor_environment.id, status=MonitorStatus.TIMEOUT
).exists()

@patch("sentry.monitors.tasks._dispatch_tasks")
@mock.patch("sentry.monitors.tasks._dispatch_tasks")
def test_monitor_task_trigger(self, dispatch_tasks):
now = datetime.now().replace(second=0, microsecond=0)

Expand All @@ -569,13 +577,13 @@ def test_monitor_task_trigger(self, dispatch_tasks):
assert dispatch_tasks.call_count == 2

# A skipped minute trigges the task AND captures an error
with patch("sentry_sdk.capture_message") as capture_message:
with mock.patch("sentry_sdk.capture_message") as capture_message:
assert capture_message.call_count == 0
try_monitor_tasks_trigger(ts=now + timedelta(minutes=3, seconds=5))
assert dispatch_tasks.call_count == 3
capture_message.assert_called_with("Monitor task dispatch minute skipped")

@patch("sentry.monitors.tasks._dispatch_tasks")
@mock.patch("sentry.monitors.tasks._dispatch_tasks")
def test_monitor_task_trigger_partition_desync(self, dispatch_tasks):
"""
When consumer partitions are not completely synchronized we may read
Expand All @@ -602,3 +610,19 @@ def test_monitor_task_trigger_partition_desync(self, dispatch_tasks):
# Fourth message moves past a new minute boundary, tick
try_monitor_tasks_trigger(ts=now + timedelta(minutes=1, seconds=1))
assert dispatch_tasks.call_count == 2

@override_settings(KAFKA_INGEST_MONITORS="monitors-test-topic")
@override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
@mock.patch("sentry.monitors.tasks._checkin_producer")
def test_clock_pulse(self, _checkin_producer):
clock_pulse()

assert _checkin_producer.produce.call_count == 1
assert _checkin_producer.produce.mock_calls[0] == mock.call(
mock.ANY,
KafkaPayload(
None,
msgpack.packb({"message_type": "clock_pulse"}),
[],
),
)