Skip to content

Commit

Permalink
Tidy up h's Celery configuration
Browse files Browse the repository at this point in the history
1. Add a comment clarifying the purpose of the
   `broker_transport_options` that we use (`max_retries`,
   `interval_start`, and `interval_step`)
2. Remove the `interval_max` option because this isn't necessary and has
   no effect when `max_retries` is used
3. Don't use `RETRY_POLICY_QUICK` when passing
   `broker_transport_options`: it is the same dict of options but broker
   transport options aren't the same thing as a retry policy, I think
   for clarity it's best not to confuse the two
4. Remove `task_acks_late=True`, use early-acknowledgement by default
   (Celery's default behvaviour). I've added `acks_late=True` to the
   tasks that I think should be using late-acknowledgement. This does
   mean that this commit changes some tasks from late- to
   early-acknowledgement when I judged that early-acknowledgement was
   acceptable for these tasks.
5. Remove `accept_content=["json"]`: this is the default value
   (see: https://docs.celeryq.dev/en/stable/userguide/configuration.html#accept-content)
6. Remove `task_ignore_result=True`: I don't think this does anything
   since we don't have a results backend configured
7. Remove `task_serializer="json"`: this is the default value
   (see: https://docs.celeryq.dev/en/stable/userguide/configuration.html#std-setting-task_serializer)
8. Remove `worker_prefetch_multiplier=1`. This defaults to `4` which
   the Celery docs say is usually a good choice unless you have very
   long-running tasks (which we don't). I don't see any reason why we
   should change this from the default and the comment says that we set
   it to `1` just because it matches the behaviour of NSQ (which we used
   before Celery) which doesn't seem like a good reason. See:
   https://docs.celeryq.dev/en/stable/userguide/configuration.html#std-setting-worker_prefetch_multiplier
  • Loading branch information
seanh committed Apr 27, 2023
1 parent b3c9409 commit 1f2b322
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 38 deletions.
36 changes: 21 additions & 15 deletions h/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

__all__ = ("celery", "get_task_logger")

from h.tasks import RETRY_POLICY_QUICK

log = logging.getLogger(__name__)

celery = Celery("h")
Expand All @@ -25,14 +23,27 @@
"CELERY_BROKER_URL",
os.environ.get("BROKER_URL", "amqp://guest:guest@localhost:5672//"),
),
# What options should we have when sending messages to the queue?
broker_transport_options=RETRY_POLICY_QUICK,
accept_content=["json"],
# Enable at-least-once delivery mode. This probably isn't actually what we
# want for all of our queues, but it makes the failure-mode behaviour of
# Celery the same as our old NSQ worker:
task_acks_late=True,
task_ignore_result=True,
broker_transport_options={
# Celery's docs are very unclear about this but: when publishing a
# message to RabbitMQ these options end up getting passed to Kombu's
# _ensure_connection() function:
# https://github.com/celery/kombu/blob/3e098dc94ed2a389276ccf3606a0ded3da157d72/kombu/connection.py#L399-L453
#
# By default _ensure_connection() can spend over 6s trying to establish
# a connection to RabbitMQ if RabbitMQ is down. This means that if
# RabbitMQ goes down then all of our web processes can quickly become
# occupied trying to establish connections when web requests try to
# call Celery tasks with .delay() or .apply_async().
#
# These options change it to use a smaller number of retries and less
# time between retries so that attempts fail fast when RabbitMQ is down
# and our whole web app remains responsive.
#
# For more info see: https://github.com/celery/celery/issues/4627#issuecomment-396907957
"max_retries": 2,
"interval_start": 0.2,
"interval_step": 0.2,
},
imports=(
"h.tasks.cleanup",
"h.tasks.indexer",
Expand All @@ -46,7 +57,6 @@
"h.tasks.indexer.add_users_annotations": "indexer",
"h.tasks.indexer.delete_annotation": "indexer",
},
task_serializer="json",
task_queues=[
Queue(
"celery",
Expand All @@ -61,10 +71,6 @@
exchange=Exchange("indexer", type="direct", durable=True),
),
],
# Only accept one task at a time. This also probably isn't what we want
# (especially not for, say, a search indexer task) but it makes the
# behaviour consistent with the previous NSQ-based worker:
worker_prefetch_multiplier=1,
)


Expand Down
13 changes: 0 additions & 13 deletions h/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,9 @@
"""Background worker task definitions for the h application."""

# These are retry policies which can be used as any of:
# * Transport options for connections
# * Transport options for celery
# * Retry policy for queueing messages
# * Retry policy for delaying tasks


RETRY_POLICY_QUICK = {
"max_retries": 2,
# The delay until the first retry
"interval_start": 0.2,
# How many seconds added to the interval for each retry
"interval_step": 0.2,
# Maximum number of seconds to sleep between each retry
"interval_max": 0.6,
}

RETRY_POLICY_VERY_QUICK = {
Expand All @@ -23,6 +12,4 @@
"interval_start": 0,
# How many seconds added to the interval for each retry
"interval_step": 0.1,
# Maximum number of seconds to sleep between each retry
"interval_max": 0.3,
}
10 changes: 5 additions & 5 deletions h/tasks/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
log = get_task_logger(__name__)


@celery.task(acks_late=False)
@celery.task
def purge_deleted_annotations():
"""
Remove annotations marked as deleted from the database.
Expand All @@ -23,29 +23,29 @@ def purge_deleted_annotations():
).delete()


@celery.task(acks_late=False)
@celery.task
def purge_expired_auth_tickets():
celery.request.db.query(models.AuthTicket).filter(
models.AuthTicket.expires < datetime.utcnow()
).delete()


@celery.task(acks_late=False)
@celery.task
def purge_expired_authz_codes():
celery.request.db.query(models.AuthzCode).filter(
models.AuthzCode.expires < datetime.utcnow()
).delete()


@celery.task(acks_late=False)
@celery.task
def purge_expired_tokens():
now = datetime.utcnow()
celery.request.db.query(models.Token).filter(
models.Token.expires < now, models.Token.refresh_token_expires < now
).delete()


@celery.task(acks_late=False)
@celery.task
def purge_removed_features():
"""Remove old feature flags from the database."""
models.Feature.remove_old_flags(celery.request.db)
8 changes: 4 additions & 4 deletions h/tasks/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class _BaseTaskWithRetry(ABC, Task):
retry_kwargs = {"countdown": 5, "max_retries": 1}


@celery.task(base=_BaseTaskWithRetry)
@celery.task(base=_BaseTaskWithRetry, acks_late=True)
def add_annotation(id_):
search_index = celery.request.find_service(name="search_index")
search_index.add_annotation_by_id(id_)
Expand Down Expand Up @@ -45,13 +45,13 @@ def add_group_annotations(groupid, tag, force, schedule_in):
)


@celery.task(base=_BaseTaskWithRetry)
@celery.task(base=_BaseTaskWithRetry, acks_late=True)
def delete_annotation(id_):
search_index = celery.request.find_service(name="search_index")
search_index.delete_annotation_by_id(id_)


@celery.task(acks_late=False)
@celery.task
def sync_annotations(limit):
search_index = celery.request.find_service(name="search_index")

Expand All @@ -66,7 +66,7 @@ def sync_annotations(limit):
)


@celery.task(acks_late=False)
@celery.task
def report_job_queue_metrics():
metrics = celery.request.find_service(name="job_queue_metrics").metrics()
newrelic.agent.record_custom_metrics(metrics)
2 changes: 1 addition & 1 deletion h/tasks/mailer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
log = get_task_logger(__name__)


@celery.task(bind=True, max_retries=3)
@celery.task(bind=True, max_retries=3, acks_late=True)
def send(self, recipients, subject, body, html=None):
"""
Send an email.
Expand Down

0 comments on commit 1f2b322

Please sign in to comment.