diff --git a/django_q/cluster.py b/django_q/cluster.py index f7332fb5..7329f819 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -22,12 +22,6 @@ # external import arrow -# optional -try: - import psutil -except ImportError: - psutil = None - # Django from django.utils import timezone from django.utils.translation import ugettext_lazy as _ @@ -37,7 +31,7 @@ import signing import tasks -from django_q.conf import Conf, redis_client, logger +from django_q.conf import Conf, redis_client, logger, psutil from django_q.models import Task, Success, Schedule from django_q.monitor import Status, Stat @@ -149,7 +143,7 @@ def status(self): if not self.start_event.is_set() and not self.stop_event.is_set(): return Conf.STARTING elif self.start_event.is_set() and not self.stop_event.is_set(): - if self.result_queue.qsize() == 0 and self.task_queue.qsize() == 0: + if Conf.QSIZE and self.result_queue.qsize() == 0 and self.task_queue.qsize() == 0: return Conf.IDLE return Conf.WORKING elif self.stop_event.is_set() and self.start_event.is_set(): diff --git a/django_q/conf.py b/django_q/conf.py index a2b35170..9c57b566 100644 --- a/django_q/conf.py +++ b/django_q/conf.py @@ -1,11 +1,20 @@ import logging from signal import signal -from multiprocessing import cpu_count +from multiprocessing import cpu_count, Queue +# django from django.utils.translation import ugettext_lazy as _ from django.conf import settings + +# external import redis +# optional +try: + import psutil +except ImportError: + psutil = None + class Conf(object): """ @@ -36,8 +45,19 @@ class Conf(object): # Maximum number of tasks that each cluster can work on QUEUE_LIMIT = conf.get('queue_limit', None) - # Number of workers in the pool. Default is cpu count. +2 for monitor and pusher - WORKERS = conf.get('workers', cpu_count()) + # Number of workers in the pool. Default is cpu count if implemented, otherwise 4. + WORKERS = conf.get('workers', False) + if not WORKERS: + try: + WORKERS = cpu_count() + # in rare cases this might fail + except NotImplementedError: + # try psutil + if psutil: + WORKERS = psutil.cpu_count() or 4 + else: + # sensible default + WORKERS = 4 # Sets compression of redis packages COMPRESSED = conf.get('compress', False) @@ -63,6 +83,12 @@ class Conf(object): # The redis stats key Q_STAT = 'django_q:{}:cluster'.format(PREFIX) + # OSX doesn't implement qsize because of missing sem_getvalue() + try: + QSIZE = Queue().qsize() == 0 + except NotImplementedError: + QSIZE = False + # Getting the signal names SIGNAL_NAMES = dict((getattr(signal, n), n) for n in dir(signal) if n.startswith('SIG') and '_' not in n) diff --git a/django_q/monitor.py b/django_q/monitor.py index 92bd7ffe..3b001c1f 100644 --- a/django_q/monitor.py +++ b/django_q/monitor.py @@ -117,10 +117,13 @@ def __init__(self, sentinel): self.reincarnations = sentinel.reincarnations self.sentinel = sentinel.pid self.status = sentinel.status() - self.done_q_size = sentinel.result_queue.qsize() + self.done_q_size = 0 + self.task_q_size = 0 + if Conf.QSIZE: + self.done_q_size = sentinel.result_queue.qsize() + self.task_q_size = sentinel.task_queue.qsize() if sentinel.monitor: self.monitor = sentinel.monitor.pid - self.task_q_size = sentinel.task_queue.qsize() if sentinel.pusher: self.pusher = sentinel.pusher.pid self.workers = [w.pid for w in sentinel.pool] diff --git a/django_q/tests/test_cluster.py b/django_q/tests/test_cluster.py index 684bb863..7d956b64 100644 --- a/django_q/tests/test_cluster.py +++ b/django_q/tests/test_cluster.py @@ -55,7 +55,8 @@ def test_cluster_initial(r): assert c.is_starting is False sleep(0.5) stat = c.stat - assert stat.status == Conf.IDLE + if Conf.QSIZE: + assert stat.status == Conf.IDLE assert c.stop() is True assert c.sentinel.is_alive() is False assert c.has_stopped diff --git a/docs/install.rst b/docs/install.rst index 67f8279a..5f2ff5ae 100644 --- a/docs/install.rst +++ b/docs/install.rst @@ -59,7 +59,7 @@ This can be useful if you have several projects using the same Redis server. workers ~~~~~~~ -The number of workers to use in the cluster. Defaults to CPU count of the current host, but can be set to a custom number. +The number of workers to use in the cluster. Defaults to CPU count of the current host, but can be set to a custom number. [#f1]_ recycle ~~~~~~~ @@ -232,4 +232,8 @@ Optional $ pip install hiredis -.. py:module:: django_q \ No newline at end of file +.. py:module:: django_q + +.. rubric:: Footnotes + +.. [#f1] Uses :func:`multiprocessing.cpu_count()` which can fail on some platforms. If so , please set the worker count in the configuration manually or install :ref:`psutil` to provide an alternative cpu count method. diff --git a/docs/monitor.rst b/docs/monitor.rst index 8da64a11..7e99a4ad 100644 --- a/docs/monitor.rst +++ b/docs/monitor.rst @@ -30,7 +30,7 @@ State Current state of the cluster: - **Starting** The cluster is spawning workers and getting ready. -- **Idle** Everything is ok, but there are no tasks to process. +- **Idle** Everything is ok, but there are no tasks to process. [#f1]_ - **Working** Processing tasks like a good cluster should. - **Stopping** The cluster does not take on any new tasks and is finishing. - **Stopped** All tasks have been processed and the cluster is shutting down. @@ -43,7 +43,7 @@ The current number of workers in the cluster pool. TQ ~~ -**Task Queue** counts the number of tasks in the queue +**Task Queue** counts the number of tasks in the queue [#f1]_ If this keeps rising it means you are taking on more tasks than your cluster can handle. You can limit this by settings the :ref:`queue_limit` in your cluster configuration, after which it will turn green when that limit has been reached. @@ -52,7 +52,7 @@ If your task queue is always hitting its limit and your running out of resources RQ ~~ -**Result Queue** shows the number of results in the queue. +**Result Queue** shows the number of results in the queue. [#f1]_ Since results are only saved by a single process which has to access the database. It's normal for the result queue to take slightly longer to clear than the task queue. @@ -118,11 +118,11 @@ Reference .. py:attribute:: task_q_size - The number of tasks currently in the task queue. + The number of tasks currently in the task queue. [#f1]_ .. py:attribute:: done_q_size - The number of tasks currently in the result queue. + The number of tasks currently in the result queue. [#f1]_ .. py:attribute:: pusher @@ -151,3 +151,7 @@ Reference .. py:classmethod:: get_all(r=redis_client) Returns a list of :class:`Stat` objects for all active clusters. Takes an optional redis connection. + +.. rubric:: Footnotes + +.. [#f1] Uses :meth:`multiprocessing.Queue.qsize()` which is not implemented on OS X.