Skip to content

Commit

Permalink
Merge pull request #41 from Koed00/dev
Browse files Browse the repository at this point in the history
Workaround for osx implementation
  • Loading branch information
Koed00 committed Aug 4, 2015
2 parents db19060 + b857386 commit 220183b
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 21 deletions.
10 changes: 2 additions & 8 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@
# external
import arrow

# optional
try:
import psutil
except ImportError:
psutil = None

# Django
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _
Expand All @@ -37,7 +31,7 @@
import signing
import tasks

from django_q.conf import Conf, redis_client, logger
from django_q.conf import Conf, redis_client, logger, psutil
from django_q.models import Task, Success, Schedule
from django_q.monitor import Status, Stat

Expand Down Expand Up @@ -149,7 +143,7 @@ def status(self):
if not self.start_event.is_set() and not self.stop_event.is_set():
return Conf.STARTING
elif self.start_event.is_set() and not self.stop_event.is_set():
if self.result_queue.qsize() == 0 and self.task_queue.qsize() == 0:
if Conf.QSIZE and self.result_queue.qsize() == 0 and self.task_queue.qsize() == 0:
return Conf.IDLE
return Conf.WORKING
elif self.stop_event.is_set() and self.start_event.is_set():
Expand Down
32 changes: 29 additions & 3 deletions django_q/conf.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
import logging
from signal import signal
from multiprocessing import cpu_count
from multiprocessing import cpu_count, Queue

# django
from django.utils.translation import ugettext_lazy as _
from django.conf import settings

# external
import redis

# optional
try:
import psutil
except ImportError:
psutil = None


class Conf(object):
"""
Expand Down Expand Up @@ -36,8 +45,19 @@ class Conf(object):
# Maximum number of tasks that each cluster can work on
QUEUE_LIMIT = conf.get('queue_limit', None)

# Number of workers in the pool. Default is cpu count. +2 for monitor and pusher
WORKERS = conf.get('workers', cpu_count())
# Number of workers in the pool. Default is cpu count if implemented, otherwise 4.
WORKERS = conf.get('workers', False)
if not WORKERS:
try:
WORKERS = cpu_count()
# in rare cases this might fail
except NotImplementedError:
# try psutil
if psutil:
WORKERS = psutil.cpu_count() or 4
else:
# sensible default
WORKERS = 4

# Sets compression of redis packages
COMPRESSED = conf.get('compress', False)
Expand All @@ -63,6 +83,12 @@ class Conf(object):
# The redis stats key
Q_STAT = 'django_q:{}:cluster'.format(PREFIX)

# OSX doesn't implement qsize because of missing sem_getvalue()
try:
QSIZE = Queue().qsize() == 0
except NotImplementedError:
QSIZE = False

# Getting the signal names
SIGNAL_NAMES = dict((getattr(signal, n), n) for n in dir(signal) if n.startswith('SIG') and '_' not in n)

Expand Down
7 changes: 5 additions & 2 deletions django_q/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,13 @@ def __init__(self, sentinel):
self.reincarnations = sentinel.reincarnations
self.sentinel = sentinel.pid
self.status = sentinel.status()
self.done_q_size = sentinel.result_queue.qsize()
self.done_q_size = 0
self.task_q_size = 0
if Conf.QSIZE:
self.done_q_size = sentinel.result_queue.qsize()
self.task_q_size = sentinel.task_queue.qsize()
if sentinel.monitor:
self.monitor = sentinel.monitor.pid
self.task_q_size = sentinel.task_queue.qsize()
if sentinel.pusher:
self.pusher = sentinel.pusher.pid
self.workers = [w.pid for w in sentinel.pool]
Expand Down
3 changes: 2 additions & 1 deletion django_q/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ def test_cluster_initial(r):
assert c.is_starting is False
sleep(0.5)
stat = c.stat
assert stat.status == Conf.IDLE
if Conf.QSIZE:
assert stat.status == Conf.IDLE
assert c.stop() is True
assert c.sentinel.is_alive() is False
assert c.has_stopped
Expand Down
8 changes: 6 additions & 2 deletions docs/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ This can be useful if you have several projects using the same Redis server.
workers
~~~~~~~

The number of workers to use in the cluster. Defaults to CPU count of the current host, but can be set to a custom number.
The number of workers to use in the cluster. Defaults to CPU count of the current host, but can be set to a custom number. [#f1]_

recycle
~~~~~~~
Expand Down Expand Up @@ -232,4 +232,8 @@ Optional
$ pip install hiredis


.. py:module:: django_q
.. py:module:: django_q
.. rubric:: Footnotes

.. [#f1] Uses :func:`multiprocessing.cpu_count()` which can fail on some platforms. If so , please set the worker count in the configuration manually or install :ref:`psutil<psutil>` to provide an alternative cpu count method.
14 changes: 9 additions & 5 deletions docs/monitor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ State
Current state of the cluster:

- **Starting** The cluster is spawning workers and getting ready.
- **Idle** Everything is ok, but there are no tasks to process.
- **Idle** Everything is ok, but there are no tasks to process. [#f1]_
- **Working** Processing tasks like a good cluster should.
- **Stopping** The cluster does not take on any new tasks and is finishing.
- **Stopped** All tasks have been processed and the cluster is shutting down.
Expand All @@ -43,7 +43,7 @@ The current number of workers in the cluster pool.
TQ
~~

**Task Queue** counts the number of tasks in the queue
**Task Queue** counts the number of tasks in the queue [#f1]_

If this keeps rising it means you are taking on more tasks than your cluster can handle.
You can limit this by settings the :ref:`queue_limit` in your cluster configuration, after which it will turn green when that limit has been reached.
Expand All @@ -52,7 +52,7 @@ If your task queue is always hitting its limit and your running out of resources
RQ
~~

**Result Queue** shows the number of results in the queue.
**Result Queue** shows the number of results in the queue. [#f1]_

Since results are only saved by a single process which has to access the database.
It's normal for the result queue to take slightly longer to clear than the task queue.
Expand Down Expand Up @@ -118,11 +118,11 @@ Reference

.. py:attribute:: task_q_size
The number of tasks currently in the task queue.
The number of tasks currently in the task queue. [#f1]_

.. py:attribute:: done_q_size
The number of tasks currently in the result queue.
The number of tasks currently in the result queue. [#f1]_

.. py:attribute:: pusher
Expand Down Expand Up @@ -151,3 +151,7 @@ Reference
.. py:classmethod:: get_all(r=redis_client)
Returns a list of :class:`Stat` objects for all active clusters. Takes an optional redis connection.

.. rubric:: Footnotes

.. [#f1] Uses :meth:`multiprocessing.Queue.qsize()` which is not implemented on OS X.

0 comments on commit 220183b

Please sign in to comment.