Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements-base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ rfc3986-validator>=0.1.1
sentry-arroyo>=2.16.5
sentry-kafka-schemas>=0.1.120
sentry-ophio==1.0.0
sentry-protos>=0.1.34
sentry-protos>=0.1.37
sentry-redis-tools>=0.1.7
sentry-relay>=0.9.3
sentry-sdk[http2]>=2.18.0
Expand Down
4 changes: 2 additions & 2 deletions requirements-dev-frozen.txt
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ pyflakes==3.2.0
pyjwt==2.4.0
pymemcache==4.0.0
pysocks==1.7.1
pytest==8.1.2
pytest==8.3.3
pytest-cov==4.0.0
pytest-django==4.9.0
pytest-fail-slow==0.3.0
Expand Down Expand Up @@ -188,7 +188,7 @@ sentry-forked-django-stubs==5.1.1.post1
sentry-forked-djangorestframework-stubs==3.15.1.post2
sentry-kafka-schemas==0.1.120
sentry-ophio==1.0.0
sentry-protos==0.1.34
sentry-protos==0.1.37
sentry-redis-tools==0.1.7
sentry-relay==0.9.3
sentry-sdk==2.18.0
Expand Down
2 changes: 1 addition & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ time-machine>=2.16.0
honcho>=2
openapi-core>=0.18.2
openapi-pydantic>=0.4.0
pytest>=8.1
pytest>=8.3
pytest-cov>=4.0.0
pytest-django>=4.9.0
pytest-fail-slow>=0.3.0
Expand Down
2 changes: 1 addition & 1 deletion requirements-frozen.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ s3transfer==0.10.0
sentry-arroyo==2.16.5
sentry-kafka-schemas==0.1.120
sentry-ophio==1.0.0
sentry-protos==0.1.34
sentry-protos==0.1.37
sentry-redis-tools==0.1.7
sentry-relay==0.9.3
sentry-sdk==2.18.0
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/taskworker/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def register(
retry: Retry | None = None,
expires: int | datetime.timedelta | None = None,
processing_deadline_duration: int | datetime.timedelta | None = None,
at_most_once: bool = False,
) -> Callable[[Callable[P, R]], Task[P, R]]:
"""register a task, used as a decorator"""

Expand All @@ -84,6 +85,7 @@ def wrapped(func: Callable[P, R]) -> Task[P, R]:
processing_deadline_duration=(
processing_deadline_duration or self.default_processing_deadline_duration
),
at_most_once=at_most_once,
)
# TODO(taskworker) tasks should be registered into the registry
# so that we can ensure task names are globally unique
Expand Down
7 changes: 6 additions & 1 deletion src/sentry/taskworker/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(
retry: Retry | None = None,
expires: int | datetime.timedelta | None = None,
processing_deadline_duration: int | datetime.timedelta | None = None,
at_most_once: bool = False,
):
# TODO(taskworker) Implement task execution deadlines
self.name = name
Expand All @@ -42,6 +43,7 @@ def __init__(
self._processing_deadline_duration = (
processing_deadline_duration or DEFAULT_PROCESSING_DEADLINE
)
self.at_most_once = at_most_once
update_wrapper(self, func)

@property
Expand Down Expand Up @@ -92,8 +94,11 @@ def _create_retry_state(self) -> RetryState:
attempts=0,
kind="sentry.taskworker.retry.Retry",
discard_after_attempt=1,
at_most_once=self.at_most_once,
)
return retry.initial_state()
retry_state = retry.initial_state()
retry_state.at_most_once = self.at_most_once
return retry_state

def should_retry(self, state: RetryState, exc: Exception) -> bool:
# No retry policy means no retries.
Expand Down
34 changes: 28 additions & 6 deletions src/sentry/taskworker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import grpc
import orjson
from django.conf import settings
from django.core.cache import cache
from sentry_protos.sentry.v1.taskworker_pb2 import (
TASK_ACTIVATION_STATUS_COMPLETE,
TASK_ACTIVATION_STATUS_FAILURE,
Expand All @@ -20,6 +21,7 @@

from sentry.taskworker.client import TaskworkerClient
from sentry.taskworker.registry import taskregistry
from sentry.taskworker.task import Task
from sentry.utils import metrics

logger = logging.getLogger("sentry.taskworker.worker")
Expand All @@ -35,6 +37,14 @@ def _process_activation(
taskregistry.get(namespace).get(task_name)(*args, **kwargs)


AT_MOST_ONCE_TIMEOUT = 60 * 60 * 24 # 1 day


def get_at_most_once_key(namespace: str, taskname: str, task_id: str) -> str:
# tw:amo -> taskworker:at_most_once
return f"tw:amo:{namespace}:{taskname}:{task_id}"


class TaskWorker:
"""
A TaskWorker fetches tasks from a taskworker RPC host and handles executing task activations.
Expand Down Expand Up @@ -128,26 +138,27 @@ def fetch_task(self) -> TaskActivation | None:
metrics.incr("taskworker.worker.get_task.success")
return activation

def _known_task(self, activation: TaskActivation) -> bool:
def _get_known_task(self, activation: TaskActivation) -> Task[Any, Any] | None:
if not taskregistry.contains(activation.namespace):
logger.error(
"taskworker.invalid_namespace",
extra={"namespace": activation.namespace, "taskname": activation.taskname},
)
return False
return None

namespace = taskregistry.get(activation.namespace)
if not namespace.contains(activation.taskname):
logger.error(
"taskworker.invalid_taskname",
extra={"namespace": activation.namespace, "taskname": activation.taskname},
)
return False
return True
return None
return namespace.get(activation.taskname)

def process_task(self, activation: TaskActivation) -> TaskActivation | None:
assert self._pool
if not self._known_task(activation):
task = self._get_known_task(activation)
if not task:
metrics.incr(
"taskworker.worker.unknown_task",
tags={"namespace": activation.namespace, "taskname": activation.taskname},
Expand All @@ -158,7 +169,18 @@ def process_task(self, activation: TaskActivation) -> TaskActivation | None:
status=TASK_ACTIVATION_STATUS_FAILURE,
)

# TODO(taskworker): Add at_most_once checks
if task.at_most_once:
key = get_at_most_once_key(activation.namespace, activation.taskname, activation.id)
if cache.add(key, "1", timeout=AT_MOST_ONCE_TIMEOUT): # The key didn't exist
metrics.incr(
"taskworker.task.at_most_once.executed", tags={"task": activation.taskname}
)
else:
metrics.incr(
"taskworker.worker.at_most_once.skipped", tags={"task": activation.taskname}
)
return None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could reply to the broker here that the task is 'complete'. That could help prevent the task from being given to another worker as this worker would appear to be 'dead' due to no response.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to do that, because the task isn't necessarily complete, it might be running elsewhere.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, the task could be running in another worker. The scenario I was concerned about is:

  1. Worker A picks up an at_most_once task. The task will be status=processing
  2. Worker A dies and can't send an update to taskbroker.
  3. The task will exceed its processing deadline, and be put back into pending.
  4. Worker B takes the task and returns early without any updates. The task will once again exceed its processing_deadline, and we'll burn worker time looping from steps 2 to 4 until the message is deadlettered.

We could have another status/state for 'failed because of idempotency' that could escape the loop. I haven't thought through how else that status should be materially different from failure though.

Copy link
Member Author

@evanh evanh Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the worker marked the task completed as soon as it added the task to the idempotency cache? There's no way for the broker to safely assign the task again after it has been assigned once. And if the task is failing before/during setting the ID, we know that know actual work has been done so another worker could still execute it.

The issue here is that if the worker fails without updating the status to failure, the task won't be deadlettered because the broker thinks it was completed.

Copy link
Member Author

@evanh evanh Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative to this is to propagate the at_most_once state into the protobuf. Then the broker will know to never retry an at most once task, and it can send it immediately to deadletter queue on a processing deadline.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Including at_most_once in the protobuf would let us solve processing_deadlines more efficiently as the broker could skip doing deadline retries if the worker never responds, and regular retries could continue to work correctly, despite retries + at_most_once being nonsense.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that, I think this PR is OK to be merged as is. If a worker happens to get assigned a task that is in the cache, it should skip it.

There will be a followup PR in the taskbroker to do some checks to avoid that scenario.


processing_timeout = activation.processing_deadline_duration
namespace = taskregistry.get(activation.namespace)
next_state = TASK_ACTIVATION_STATUS_FAILURE
Expand Down
31 changes: 31 additions & 0 deletions tests/sentry/taskworker/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,22 @@ def test_create_activation(task_namespace: TaskNamespace) -> None:
processing_deadline_duration=30,
)

at_most_once_task = Task(
name="test.at_most_once",
func=do_things,
namespace=task_namespace,
at_most_once=True,
)

retry = Retry(times=3, times_exceeded=LastAction.Deadletter)
retry_at_most_once_task = Task(
name="test.with_retry_at_most_once",
func=do_things,
namespace=task_namespace,
retry=retry,
at_most_once=True,
)

# No retries will be made as there is no retry policy on the task or namespace.
activation = no_retry_task.create_activation()
assert activation.taskname == "test.no_retry"
Expand All @@ -136,6 +152,21 @@ def test_create_activation(task_namespace: TaskNamespace) -> None:
assert activation.expires == 300
assert activation.processing_deadline_duration == 30

activation = at_most_once_task.create_activation()
assert activation.taskname == "test.at_most_once"
assert activation.namespace == task_namespace.name
assert activation.retry_state
assert activation.retry_state.at_most_once is True

activation = retry_at_most_once_task.create_activation()
assert activation.taskname == "test.with_retry_at_most_once"
assert activation.namespace == task_namespace.name
assert activation.retry_state
assert activation.retry_state.at_most_once is True
assert activation.retry_state.attempts == 0
assert activation.retry_state.discard_after_attempt == 0
assert activation.retry_state.deadletter_after_attempt == 3


def test_create_activation_parameters(task_namespace: TaskNamespace) -> None:
@task_namespace.register(name="test.parameters")
Expand Down
35 changes: 32 additions & 3 deletions tests/sentry/taskworker/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ def fail_task():
raise ValueError("nope")


@test_namespace.register(name="test.at_most_once", at_most_once=True)
def at_most_once_task():
pass


SIMPLE_TASK = TaskActivation(
id="111",
taskname="test.simple_task",
Expand Down Expand Up @@ -66,6 +71,14 @@ def fail_task():
processing_deadline_duration=1,
)

AT_MOST_ONCE_TASK = TaskActivation(
id="555",
taskname="test.at_most_once",
namespace="tests",
parameters='{"args": [], "kwargs": {}}',
processing_deadline_duration=1,
)


@override_settings(TASKWORKER_IMPORTS=("tests.sentry.taskworker.test_worker",))
class TestTaskWorker(TestCase):
Expand Down Expand Up @@ -118,16 +131,32 @@ def test_process_task_retry(self) -> None:

def test_process_task_failure(self) -> None:
taskworker = TaskWorker(rpc_host="127.0.0.1:50051", max_task_count=100)
with mock.patch.object(taskworker.client, "update_task") as mock_update_task:
mock_update_task.return_value = SIMPLE_TASK
with mock.patch.object(taskworker.client, "update_task") as mock_update:
mock_update.return_value = SIMPLE_TASK
result = taskworker.process_task(FAIL_TASK)

mock_update_task.assert_called_with(
mock_update.assert_called_with(
task_id=FAIL_TASK.id, status=TASK_ACTIVATION_STATUS_FAILURE
)
assert result
assert result.id == SIMPLE_TASK.id

def test_process_task_at_most_once(self) -> None:
taskworker = TaskWorker(rpc_host="127.0.0.1:50051", max_task_count=100)
with mock.patch.object(taskworker.client, "update_task") as mock_update:
mock_update.return_value = SIMPLE_TASK
result = taskworker.process_task(AT_MOST_ONCE_TASK)

mock_update.assert_called_with(
task_id=AT_MOST_ONCE_TASK.id, status=TASK_ACTIVATION_STATUS_COMPLETE
)
assert taskworker.process_task(AT_MOST_ONCE_TASK) is None
assert result
assert result.id == SIMPLE_TASK.id

result = taskworker.process_task(AT_MOST_ONCE_TASK)
assert result is None

def test_start_max_task_count(self) -> None:
taskworker = TaskWorker(rpc_host="127.0.0.1:50051", max_task_count=1)
with mock.patch.object(taskworker, "client") as mock_client:
Expand Down
Loading