From 5db99126bd6106cbd95cd45c216a9b5377bbf414 Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Wed, 20 Nov 2024 11:59:48 -0500 Subject: [PATCH 1/4] fix(taskbroker): Add at_most_once support If a task is marked as at_most_once, then check the cache to see if the task has already been seen before. If it has, assume the task has already been executed and continue. Otherwise store the task ID and execute the task. --- requirements-base.txt | 2 +- requirements-dev-frozen.txt | 4 +-- requirements-dev.txt | 2 +- requirements-frozen.txt | 2 +- src/sentry/taskworker/registry.py | 2 ++ src/sentry/taskworker/task.py | 2 ++ src/sentry/taskworker/worker.py | 34 ++++++++++++++++++++----- tests/sentry/taskworker/test_worker.py | 35 +++++++++++++++++++++++--- 8 files changed, 69 insertions(+), 14 deletions(-) diff --git a/requirements-base.txt b/requirements-base.txt index 6ea0481cb0af49..1f8f38afef2f74 100644 --- a/requirements-base.txt +++ b/requirements-base.txt @@ -68,7 +68,7 @@ rfc3986-validator>=0.1.1 sentry-arroyo>=2.16.5 sentry-kafka-schemas>=0.1.120 sentry-ophio==1.0.0 -sentry-protos>=0.1.34 +sentry-protos>=0.1.36 sentry-redis-tools>=0.1.7 sentry-relay>=0.9.3 sentry-sdk[http2]>=2.18.0 diff --git a/requirements-dev-frozen.txt b/requirements-dev-frozen.txt index ddf67a92b8915b..78d9dc4019a82a 100644 --- a/requirements-dev-frozen.txt +++ b/requirements-dev-frozen.txt @@ -146,7 +146,7 @@ pyflakes==3.2.0 pyjwt==2.4.0 pymemcache==4.0.0 pysocks==1.7.1 -pytest==8.1.2 +pytest==8.3.3 pytest-cov==4.0.0 pytest-django==4.9.0 pytest-fail-slow==0.3.0 @@ -188,7 +188,7 @@ sentry-forked-django-stubs==5.1.1.post1 sentry-forked-djangorestframework-stubs==3.15.1.post2 sentry-kafka-schemas==0.1.120 sentry-ophio==1.0.0 -sentry-protos==0.1.34 +sentry-protos==0.1.36 sentry-redis-tools==0.1.7 sentry-relay==0.9.3 sentry-sdk==2.18.0 diff --git a/requirements-dev.txt b/requirements-dev.txt index 6d1941de53e279..5e01bcaaf657c3 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -10,7 +10,7 @@ time-machine>=2.16.0 honcho>=2 openapi-core>=0.18.2 openapi-pydantic>=0.4.0 -pytest>=8.1 +pytest>=8.3 pytest-cov>=4.0.0 pytest-django>=4.9.0 pytest-fail-slow>=0.3.0 diff --git a/requirements-frozen.txt b/requirements-frozen.txt index 0dff6e9bf70562..3c900337513271 100644 --- a/requirements-frozen.txt +++ b/requirements-frozen.txt @@ -127,7 +127,7 @@ s3transfer==0.10.0 sentry-arroyo==2.16.5 sentry-kafka-schemas==0.1.120 sentry-ophio==1.0.0 -sentry-protos==0.1.34 +sentry-protos==0.1.36 sentry-redis-tools==0.1.7 sentry-relay==0.9.3 sentry-sdk==2.18.0 diff --git a/src/sentry/taskworker/registry.py b/src/sentry/taskworker/registry.py index 96aae140d095b1..7d93fdef05fbc5 100644 --- a/src/sentry/taskworker/registry.py +++ b/src/sentry/taskworker/registry.py @@ -71,6 +71,7 @@ def register( retry: Retry | None = None, expires: int | datetime.timedelta | None = None, processing_deadline_duration: int | datetime.timedelta | None = None, + at_most_once: bool = False, ) -> Callable[[Callable[P, R]], Task[P, R]]: """register a task, used as a decorator""" @@ -84,6 +85,7 @@ def wrapped(func: Callable[P, R]) -> Task[P, R]: processing_deadline_duration=( processing_deadline_duration or self.default_processing_deadline_duration ), + at_most_once=at_most_once, ) # TODO(taskworker) tasks should be registered into the registry # so that we can ensure task names are globally unique diff --git a/src/sentry/taskworker/task.py b/src/sentry/taskworker/task.py index 0d60884d850406..13d7dfb82bdfa7 100644 --- a/src/sentry/taskworker/task.py +++ b/src/sentry/taskworker/task.py @@ -32,6 +32,7 @@ def __init__( retry: Retry | None = None, expires: int | datetime.timedelta | None = None, processing_deadline_duration: int | datetime.timedelta | None = None, + at_most_once: bool = False, ): # TODO(taskworker) Implement task execution deadlines self.name = name @@ -42,6 +43,7 @@ def __init__( self._processing_deadline_duration = ( processing_deadline_duration or DEFAULT_PROCESSING_DEADLINE ) + self.at_most_once = at_most_once update_wrapper(self, func) @property diff --git a/src/sentry/taskworker/worker.py b/src/sentry/taskworker/worker.py index 6b2f57cb0ee83a..740052413b9f43 100644 --- a/src/sentry/taskworker/worker.py +++ b/src/sentry/taskworker/worker.py @@ -11,6 +11,7 @@ import grpc import orjson from django.conf import settings +from django.core.cache import cache from sentry_protos.sentry.v1.taskworker_pb2 import ( TASK_ACTIVATION_STATUS_COMPLETE, TASK_ACTIVATION_STATUS_FAILURE, @@ -20,6 +21,7 @@ from sentry.taskworker.client import TaskworkerClient from sentry.taskworker.registry import taskregistry +from sentry.taskworker.task import Task from sentry.utils import metrics logger = logging.getLogger("sentry.taskworker.worker") @@ -35,6 +37,14 @@ def _process_activation( taskregistry.get(namespace).get(task_name)(*args, **kwargs) +AT_MOST_ONCE_TIMEOUT = 60 * 60 * 24 # 1 day + + +def get_at_most_once_key(namespace: str, taskname: str, task_id: str) -> str: + # tw:amo -> taskworker:at_most_once + return f"tw:amo:{namespace}:{taskname}:{task_id}" + + class TaskWorker: """ A TaskWorker fetches tasks from a taskworker RPC host and handles executing task activations. @@ -128,13 +138,13 @@ def fetch_task(self) -> TaskActivation | None: metrics.incr("taskworker.worker.get_task.success") return activation - def _known_task(self, activation: TaskActivation) -> bool: + def _get_known_task(self, activation: TaskActivation) -> Task | None: if not taskregistry.contains(activation.namespace): logger.error( "taskworker.invalid_namespace", extra={"namespace": activation.namespace, "taskname": activation.taskname}, ) - return False + return None namespace = taskregistry.get(activation.namespace) if not namespace.contains(activation.taskname): @@ -142,12 +152,13 @@ def _known_task(self, activation: TaskActivation) -> bool: "taskworker.invalid_taskname", extra={"namespace": activation.namespace, "taskname": activation.taskname}, ) - return False - return True + return None + return namespace.get(activation.taskname) def process_task(self, activation: TaskActivation) -> TaskActivation | None: assert self._pool - if not self._known_task(activation): + task = self._get_known_task(activation) + if not task: metrics.incr( "taskworker.worker.unknown_task", tags={"namespace": activation.namespace, "taskname": activation.taskname}, @@ -158,7 +169,18 @@ def process_task(self, activation: TaskActivation) -> TaskActivation | None: status=TASK_ACTIVATION_STATUS_FAILURE, ) - # TODO(taskworker): Add at_most_once checks + if task.at_most_once: + key = get_at_most_once_key(activation.namespace, activation.taskname, activation.id) + if cache.add(key, "1", timeout=AT_MOST_ONCE_TIMEOUT): # The key didn't exist + metrics.incr( + "taskworker.task.at_most_once.executed", tags={"task": activation.taskname} + ) + else: + metrics.incr( + "taskworker.worker.at_most_once.skipped", tags={"task": activation.taskname} + ) + return None + processing_timeout = activation.processing_deadline_duration namespace = taskregistry.get(activation.namespace) next_state = TASK_ACTIVATION_STATUS_FAILURE diff --git a/tests/sentry/taskworker/test_worker.py b/tests/sentry/taskworker/test_worker.py index e8c2f5154ff30c..e7d9c11eeeddfc 100644 --- a/tests/sentry/taskworker/test_worker.py +++ b/tests/sentry/taskworker/test_worker.py @@ -34,6 +34,11 @@ def fail_task(): raise ValueError("nope") +@test_namespace.register(name="test.at_most_once", at_most_once=True) +def at_most_once_task(): + pass + + SIMPLE_TASK = TaskActivation( id="111", taskname="test.simple_task", @@ -66,6 +71,14 @@ def fail_task(): processing_deadline_duration=1, ) +AT_MOST_ONCE_TASK = TaskActivation( + id="555", + taskname="test.at_most_once", + namespace="tests", + parameters='{"args": [], "kwargs": {}}', + processing_deadline_duration=1, +) + @override_settings(TASKWORKER_IMPORTS=("tests.sentry.taskworker.test_worker",)) class TestTaskWorker(TestCase): @@ -118,16 +131,32 @@ def test_process_task_retry(self) -> None: def test_process_task_failure(self) -> None: taskworker = TaskWorker(rpc_host="127.0.0.1:50051", max_task_count=100) - with mock.patch.object(taskworker.client, "update_task") as mock_update_task: - mock_update_task.return_value = SIMPLE_TASK + with mock.patch.object(taskworker.client, "update_task") as mock_update: + mock_update.return_value = SIMPLE_TASK result = taskworker.process_task(FAIL_TASK) - mock_update_task.assert_called_with( + mock_update.assert_called_with( task_id=FAIL_TASK.id, status=TASK_ACTIVATION_STATUS_FAILURE ) assert result assert result.id == SIMPLE_TASK.id + def test_process_task_at_most_once(self) -> None: + taskworker = TaskWorker(rpc_host="127.0.0.1:50051", max_task_count=100) + with mock.patch.object(taskworker.client, "update_task") as mock_update: + mock_update.return_value = SIMPLE_TASK + result = taskworker.process_task(AT_MOST_ONCE_TASK) + + mock_update.assert_called_with( + task_id=AT_MOST_ONCE_TASK.id, status=TASK_ACTIVATION_STATUS_COMPLETE + ) + assert taskworker.process_task(AT_MOST_ONCE_TASK) is None + assert result + assert result.id == SIMPLE_TASK.id + + result = taskworker.process_task(AT_MOST_ONCE_TASK) + assert result is None + def test_start_max_task_count(self) -> None: taskworker = TaskWorker(rpc_host="127.0.0.1:50051", max_task_count=1) with mock.patch.object(taskworker, "client") as mock_client: From bcdbf9f8342b75b65f46ee951fa9b9683cc99e15 Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Mon, 25 Nov 2024 14:58:20 -0500 Subject: [PATCH 2/4] fix typing --- src/sentry/taskworker/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/taskworker/worker.py b/src/sentry/taskworker/worker.py index 740052413b9f43..bef131a61298de 100644 --- a/src/sentry/taskworker/worker.py +++ b/src/sentry/taskworker/worker.py @@ -138,7 +138,7 @@ def fetch_task(self) -> TaskActivation | None: metrics.incr("taskworker.worker.get_task.success") return activation - def _get_known_task(self, activation: TaskActivation) -> Task | None: + def _get_known_task(self, activation: TaskActivation) -> Task[Any, Any] | None: if not taskregistry.contains(activation.namespace): logger.error( "taskworker.invalid_namespace", From 628c362035721961d5aa85bdb4af06bfad3a7199 Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Mon, 25 Nov 2024 15:18:47 -0500 Subject: [PATCH 3/4] update to propagate at_most_once --- requirements-base.txt | 2 +- src/sentry/taskworker/task.py | 5 ++++- tests/sentry/taskworker/test_task.py | 31 ++++++++++++++++++++++++++++ 3 files changed, 36 insertions(+), 2 deletions(-) diff --git a/requirements-base.txt b/requirements-base.txt index 1f8f38afef2f74..3fe990ec7d9918 100644 --- a/requirements-base.txt +++ b/requirements-base.txt @@ -68,7 +68,7 @@ rfc3986-validator>=0.1.1 sentry-arroyo>=2.16.5 sentry-kafka-schemas>=0.1.120 sentry-ophio==1.0.0 -sentry-protos>=0.1.36 +sentry-protos>=0.1.37 sentry-redis-tools>=0.1.7 sentry-relay>=0.9.3 sentry-sdk[http2]>=2.18.0 diff --git a/src/sentry/taskworker/task.py b/src/sentry/taskworker/task.py index 13d7dfb82bdfa7..e44d8aa59b61b5 100644 --- a/src/sentry/taskworker/task.py +++ b/src/sentry/taskworker/task.py @@ -94,8 +94,11 @@ def _create_retry_state(self) -> RetryState: attempts=0, kind="sentry.taskworker.retry.Retry", discard_after_attempt=1, + at_most_once=self.at_most_once, ) - return retry.initial_state() + retry_state = retry.initial_state() + retry_state.at_most_once = self.at_most_once + return retry_state def should_retry(self, state: RetryState, exc: Exception) -> bool: # No retry policy means no retries. diff --git a/tests/sentry/taskworker/test_task.py b/tests/sentry/taskworker/test_task.py index c6dd566f6f8216..8367a3bce15e2b 100644 --- a/tests/sentry/taskworker/test_task.py +++ b/tests/sentry/taskworker/test_task.py @@ -110,6 +110,22 @@ def test_create_activation(task_namespace: TaskNamespace) -> None: processing_deadline_duration=30, ) + at_most_once_task = Task( + name="test.at_most_once", + func=do_things, + namespace=task_namespace, + at_most_once=True, + ) + + retry = Retry(times=3, times_exceeded=LastAction.Deadletter) + retry_at_most_once_task = Task( + name="test.with_retry_at_most_once", + func=do_things, + namespace=task_namespace, + retry=retry, + at_most_once=True, + ) + # No retries will be made as there is no retry policy on the task or namespace. activation = no_retry_task.create_activation() assert activation.taskname == "test.no_retry" @@ -136,6 +152,21 @@ def test_create_activation(task_namespace: TaskNamespace) -> None: assert activation.expires == 300 assert activation.processing_deadline_duration == 30 + activation = at_most_once_task.create_activation() + assert activation.taskname == "test.at_most_once" + assert activation.namespace == task_namespace.name + assert activation.retry_state + assert activation.retry_state.at_most_once is True + + activation = retry_at_most_once_task.create_activation() + assert activation.taskname == "test.with_retry_at_most_once" + assert activation.namespace == task_namespace.name + assert activation.retry_state + assert activation.retry_state.at_most_once is True + assert activation.retry_state.attempts == 0 + assert activation.retry_state.discard_after_attempt == 0 + assert activation.retry_state.deadletter_after_attempt == 3 + def test_create_activation_parameters(task_namespace: TaskNamespace) -> None: @task_namespace.register(name="test.parameters") From 16e8e84f355e1224e4b755d69153f2608077b9ca Mon Sep 17 00:00:00 2001 From: "getsantry[bot]" <66042841+getsantry[bot]@users.noreply.github.com> Date: Tue, 26 Nov 2024 14:38:05 +0000 Subject: [PATCH 4/4] :snowflake: re-freeze requirements --- requirements-dev-frozen.txt | 2 +- requirements-frozen.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/requirements-dev-frozen.txt b/requirements-dev-frozen.txt index 78d9dc4019a82a..17b19f48dc53e2 100644 --- a/requirements-dev-frozen.txt +++ b/requirements-dev-frozen.txt @@ -188,7 +188,7 @@ sentry-forked-django-stubs==5.1.1.post1 sentry-forked-djangorestframework-stubs==3.15.1.post2 sentry-kafka-schemas==0.1.120 sentry-ophio==1.0.0 -sentry-protos==0.1.36 +sentry-protos==0.1.37 sentry-redis-tools==0.1.7 sentry-relay==0.9.3 sentry-sdk==2.18.0 diff --git a/requirements-frozen.txt b/requirements-frozen.txt index 3c900337513271..26164f4a387bd0 100644 --- a/requirements-frozen.txt +++ b/requirements-frozen.txt @@ -127,7 +127,7 @@ s3transfer==0.10.0 sentry-arroyo==2.16.5 sentry-kafka-schemas==0.1.120 sentry-ophio==1.0.0 -sentry-protos==0.1.36 +sentry-protos==0.1.37 sentry-redis-tools==0.1.7 sentry-relay==0.9.3 sentry-sdk==2.18.0