From 4642deb786a699b528115de14f7d77ec70d84291 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Bartosi=C5=84ski?= Date: Thu, 15 Dec 2022 11:46:14 +0100 Subject: [PATCH] Queue notification to optimize polling --- tasktiger/task.py | 10 ++++++---- tasktiger/tasktiger.py | 15 +++++++++++++++ tasktiger/worker.py | 30 +++++++++++++++++++++++++++--- 3 files changed, 48 insertions(+), 7 deletions(-) diff --git a/tasktiger/task.py b/tasktiger/task.py index 1a24ccfd..5eba43db 100644 --- a/tasktiger/task.py +++ b/tasktiger/task.py @@ -320,8 +320,8 @@ def _move(self, from_state=None, to_state=None, when=None, mode=None): _key(from_state), queue, _key(from_state, queue), client=pipeline ) - if to_state == QUEUED and self.tiger.config["PUBLISH_QUEUED_TASKS"]: - pipeline.publish(_key("activity"), queue) + if to_state == QUEUED: + self.tiger._notify_queue(queue, client=pipeline) try: scripts.execute_pipeline(pipeline) @@ -396,8 +396,10 @@ def delay(self, when=None, max_queue_size=None): mode="nx", client=pipeline, ) - if state == QUEUED and tiger.config["PUBLISH_QUEUED_TASKS"]: - pipeline.publish(tiger._key("activity"), self.queue) + + if state == QUEUED: + tiger._notify_queue(self.queue, client=pipeline) + pipeline.execute() self._state = state diff --git a/tasktiger/tasktiger.py b/tasktiger/tasktiger.py index 20a8880d..4c5eda40 100644 --- a/tasktiger/tasktiger.py +++ b/tasktiger/tasktiger.py @@ -3,6 +3,7 @@ import datetime import importlib import logging +import secrets from collections import defaultdict import click @@ -256,6 +257,20 @@ def _get_current_tasks(self): current_task = property(_get_current_task) current_tasks = property(_get_current_tasks) + def _notify_queue(self, queue, client=None): + client = client or self.connection + + if self.config["PUBLISH_QUEUED_TASKS"]: + client.publish(self._key("activity"), queue) + + # XXX Maybe use each element from `dotted_parts` instead just the queue root? + # Not sure about performance though. + client.set( + self._key("queue_token", queue.split(".", 1)[0]), + secrets.token_hex(), + ex=7200, # XXX Just for tests so we don't clutter Redis + ) + @classproperty def current_instance(self): """ diff --git a/tasktiger/worker.py b/tasktiger/worker.py index 4897ecf3..57f80a9d 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -65,6 +65,7 @@ def __init__( self._key = tiger._key self._did_work = True self._last_task_check = 0.0 + self._queue_set_token = "" self.stats_thread = None self.id = str(uuid.uuid4()) @@ -200,8 +201,7 @@ def _worker_queue_scheduled_tasks(self) -> None: # XXX: ideally this would be in the same pipeline, but we only want # to announce if there was a result. if result: - if self.config["PUBLISH_QUEUED_TASKS"]: - self.connection.publish(self._key("activity"), queue) + self.tiger._notify_queue(queue) self._did_work = True def _poll_for_queues(self) -> None: @@ -214,7 +214,31 @@ def _poll_for_queues(self) -> None: """ if not self._did_work: time.sleep(self.config["POLL_TASK_QUEUES_INTERVAL"]) - self._refresh_queue_set() + + if self._is_queue_set_out_of_date(): + self._refresh_queue_set() + self.log.info(f"Poll: Done ({len(self._queue_set)})") + + def _is_queue_set_out_of_date(self) -> bool: + if not self.only_queues: + return True + + queue_set_token = ":".join( + token or "" + for token in self.connection.mget( + sorted( + self._key("queue_token", queue) + for queue in self.only_queues + ) + ) + ) + + if queue_set_token != self._queue_set_token: + self.log.info("Poll: Token changed") + self._queue_set_token = queue_set_token + return True + + return False def _pubsub_for_queues(self, timeout=0, batch_timeout=0) -> None: """