Skip to content

Commit

Permalink
Queue notification to optimize polling
Browse files Browse the repository at this point in the history
  • Loading branch information
neob91-close committed Dec 15, 2022
1 parent b1d0db4 commit 4642deb
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 7 deletions.
10 changes: 6 additions & 4 deletions tasktiger/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@ def _move(self, from_state=None, to_state=None, when=None, mode=None):
_key(from_state), queue, _key(from_state, queue), client=pipeline
)

if to_state == QUEUED and self.tiger.config["PUBLISH_QUEUED_TASKS"]:
pipeline.publish(_key("activity"), queue)
if to_state == QUEUED:
self.tiger._notify_queue(queue, client=pipeline)

try:
scripts.execute_pipeline(pipeline)
Expand Down Expand Up @@ -396,8 +396,10 @@ def delay(self, when=None, max_queue_size=None):
mode="nx",
client=pipeline,
)
if state == QUEUED and tiger.config["PUBLISH_QUEUED_TASKS"]:
pipeline.publish(tiger._key("activity"), self.queue)

if state == QUEUED:
tiger._notify_queue(self.queue, client=pipeline)

pipeline.execute()

self._state = state
Expand Down
15 changes: 15 additions & 0 deletions tasktiger/tasktiger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import datetime
import importlib
import logging
import secrets
from collections import defaultdict

import click
Expand Down Expand Up @@ -256,6 +257,20 @@ def _get_current_tasks(self):
current_task = property(_get_current_task)
current_tasks = property(_get_current_tasks)

def _notify_queue(self, queue, client=None):
client = client or self.connection

if self.config["PUBLISH_QUEUED_TASKS"]:
client.publish(self._key("activity"), queue)

# XXX Maybe use each element from `dotted_parts` instead just the queue root?
# Not sure about performance though.
client.set(
self._key("queue_token", queue.split(".", 1)[0]),
secrets.token_hex(),
ex=7200, # XXX Just for tests so we don't clutter Redis
)

@classproperty
def current_instance(self):
"""
Expand Down
30 changes: 27 additions & 3 deletions tasktiger/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def __init__(
self._key = tiger._key
self._did_work = True
self._last_task_check = 0.0
self._queue_set_token = ""
self.stats_thread = None
self.id = str(uuid.uuid4())

Expand Down Expand Up @@ -200,8 +201,7 @@ def _worker_queue_scheduled_tasks(self) -> None:
# XXX: ideally this would be in the same pipeline, but we only want
# to announce if there was a result.
if result:
if self.config["PUBLISH_QUEUED_TASKS"]:
self.connection.publish(self._key("activity"), queue)
self.tiger._notify_queue(queue)
self._did_work = True

def _poll_for_queues(self) -> None:
Expand All @@ -214,7 +214,31 @@ def _poll_for_queues(self) -> None:
"""
if not self._did_work:
time.sleep(self.config["POLL_TASK_QUEUES_INTERVAL"])
self._refresh_queue_set()

if self._is_queue_set_out_of_date():
self._refresh_queue_set()
self.log.info(f"Poll: Done ({len(self._queue_set)})")

def _is_queue_set_out_of_date(self) -> bool:
if not self.only_queues:
return True

queue_set_token = ":".join(
token or ""
for token in self.connection.mget(
sorted(
self._key("queue_token", queue)
for queue in self.only_queues
)
)
)

if queue_set_token != self._queue_set_token:
self.log.info("Poll: Token changed")
self._queue_set_token = queue_set_token
return True

return False

def _pubsub_for_queues(self, timeout=0, batch_timeout=0) -> None:
"""
Expand Down

0 comments on commit 4642deb

Please sign in to comment.