Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Differentiate between PID and unique cluster ID #414

Merged
merged 14 commits into from Feb 14, 2020
17 changes: 10 additions & 7 deletions django_q/cluster.py
Expand Up @@ -14,6 +14,7 @@
import signal
import socket
import traceback
import uuid
# Django
from django import db
from django.utils import timezone
Expand All @@ -38,6 +39,7 @@ def __init__(self, broker=None):
self.stop_event = None
self.start_event = None
self.pid = current_process().pid
self.cluster_id = uuid.uuid4()
self.host = socket.gethostname()
self.timeout = Conf.TIMEOUT
signal.signal(signal.SIGTERM, self.sig_handler)
Expand All @@ -48,20 +50,20 @@ def start(self):
self.stop_event = Event()
self.start_event = Event()
self.sentinel = Process(target=Sentinel,
args=(self.stop_event, self.start_event, self.broker, self.timeout))
args=(self.stop_event, self.start_event, self.cluster_id, self.broker, self.timeout))
self.sentinel.start()
logger.info(_('Q Cluster-{} starting.').format(self.pid))
logger.info(_('Q Cluster-{} starting.').format(self.cluster_id))
while not self.start_event.is_set():
sleep(0.1)
return self.pid

def stop(self):
if not self.sentinel.is_alive():
return False
logger.info(_('Q Cluster-{} stopping.').format(self.pid))
logger.info(_('Q Cluster-{} stopping.').format(self.cluster_id))
self.stop_event.set()
self.sentinel.join()
logger.info(_('Q Cluster-{} has stopped.').format(self.pid))
logger.info(_('Q Cluster-{} has stopped.').format(self.cluster_id))
self.start_event = None
self.stop_event = None
return True
Expand All @@ -74,8 +76,8 @@ def sig_handler(self, signum, frame):
@property
def stat(self):
if self.sentinel:
return Stat.get(self.pid)
return Status(self.pid)
return Stat.get(pid=self.pid, cluster_id=self.cluster_id)
return Status(pid=self.pid, cluster_id=self.cluster_id)

@property
def is_starting(self):
Expand All @@ -95,11 +97,12 @@ def has_stopped(self):


class Sentinel(object):
def __init__(self, stop_event, start_event, broker=None, timeout=Conf.TIMEOUT, start=True):
def __init__(self, stop_event, start_event, cluster_id, broker=None, timeout=Conf.TIMEOUT, start=True):
# Make sure we catch signals for the pool
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
self.pid = current_process().pid
self.cluster_id = cluster_id
self.parent_pid = get_ppid()
self.name = current_process().name
self.broker = broker or get_broker()
Expand Down
2 changes: 1 addition & 1 deletion django_q/monitor.py
Expand Up @@ -72,7 +72,7 @@ def monitor(run_once=False, broker=None):
uptime = '%d:%02d:%02d' % (hours, minutes, seconds)
# print to the terminal
print(term.move(i, 0) + term.center(stat.host[:col_width - 1], width=col_width - 1))
print(term.move(i, 1 * col_width) + term.center(stat.cluster_id, width=col_width - 1))
print(term.move(i, 1 * col_width) + term.center(str(stat.cluster_id)[-8:], width=col_width - 1))
print(term.move(i, 2 * col_width) + term.center(status, width=col_width - 1))
print(term.move(i, 3 * col_width) + term.center(workers, width=col_width - 1))
print(term.move(i, 4 * col_width) + term.center(tasks, width=col_width - 1))
Expand Down
11 changes: 6 additions & 5 deletions django_q/status.py
Expand Up @@ -8,11 +8,12 @@
class Status(object):
"""Cluster status base class."""

def __init__(self, pid):
def __init__(self, pid, cluster_id):
self.workers = []
self.tob = None
self.reincarnations = 0
self.cluster_id = pid
self.pid = pid
self.cluster_id = cluster_id
self.sentinel = 0
self.status = Conf.STOPPED
self.done_q_size = 0
Expand All @@ -27,7 +28,7 @@ class Stat(Status):
"""Status object for Cluster monitoring."""

def __init__(self, sentinel):
super(Stat, self).__init__(sentinel.parent_pid or sentinel.pid)
super(Stat, self).__init__(sentinel.parent_pid or sentinel.pid, cluster_id=sentinel.cluster_id)
self.broker = sentinel.broker or get_broker()
self.tob = sentinel.tob
self.reincarnations = sentinel.reincarnations
Expand Down Expand Up @@ -72,7 +73,7 @@ def empty_queues(self):
return self.done_q_size + self.task_q_size == 0

@staticmethod
def get(cluster_id, broker=None):
def get(pid, cluster_id, broker=None):
"""
gets the current status for the cluster
:param cluster_id: id of the cluster
Expand All @@ -86,7 +87,7 @@ def get(cluster_id, broker=None):
return SignedPackage.loads(pack)
except BadSignature:
return None
return Status(cluster_id)
return Status(pid=pid, cluster_id=cluster_id)

@staticmethod
def get_all(broker=None):
Expand Down
18 changes: 12 additions & 6 deletions django_q/tests/test_cluster.py
Expand Up @@ -3,6 +3,7 @@
from multiprocessing import Event, Value
from time import sleep
from django.utils import timezone
import uuid as uuidlib

import os
import pytest
Expand Down Expand Up @@ -72,7 +73,8 @@ def test_sentinel():
start_event = Event()
stop_event = Event()
stop_event.set()
s = Sentinel(stop_event, start_event, broker=get_broker('sentinel_test:q'))
cluster_id = uuidlib.uuid4()
s = Sentinel(stop_event, start_event, cluster_id=cluster_id, broker=get_broker('sentinel_test:q'))
assert start_event.is_set()
assert s.status() == Conf.STOPPED

Expand Down Expand Up @@ -256,9 +258,10 @@ def test_timeout(broker, cluster_config_timeout, async_task_kwargs):
async_task('time.sleep', 5, broker=broker, **async_task_kwargs)
start_event = Event()
stop_event = Event()
cluster_id = uuidlib.uuid4()
# Set a timer to stop the Sentinel
threading.Timer(3, stop_event.set).start()
s = Sentinel(stop_event, start_event, broker=broker, timeout=cluster_config_timeout)
s = Sentinel(stop_event, start_event, cluster_id=cluster_id, broker=broker, timeout=cluster_config_timeout)
assert start_event.is_set()
assert s.status() == Conf.STOPPED
assert s.reincarnations == 1
Expand All @@ -279,9 +282,10 @@ def test_timeout_task_finishes(broker, cluster_config_timeout, async_task_kwargs
async_task('time.sleep', 3, broker=broker, **async_task_kwargs)
start_event = Event()
stop_event = Event()
cluster_id = uuidlib.uuid4()
# Set a timer to stop the Sentinel
threading.Timer(6, stop_event.set).start()
s = Sentinel(stop_event, start_event, broker=broker, timeout=cluster_config_timeout)
s = Sentinel(stop_event, start_event, cluster_id=cluster_id, broker=broker, timeout=cluster_config_timeout)
assert start_event.is_set()
assert s.status() == Conf.STOPPED
assert s.reincarnations == 0
Expand All @@ -297,12 +301,13 @@ def test_recycle(broker, monkeypatch):
async_task('django_q.tests.tasks.multiply', 2, 2, broker=broker)
start_event = Event()
stop_event = Event()
cluster_id = uuidlib.uuid4()
# override settings
monkeypatch.setattr(Conf, 'RECYCLE', 2)
monkeypatch.setattr(Conf, 'WORKERS', 1)
# set a timer to stop the Sentinel
threading.Timer(3, stop_event.set).start()
s = Sentinel(stop_event, start_event, broker=broker)
s = Sentinel(stop_event, start_event, cluster_id=cluster_id, broker=broker)
assert start_event.is_set()
assert s.status() == Conf.STOPPED
assert s.reincarnations == 1
Expand Down Expand Up @@ -333,13 +338,14 @@ def test_bad_secret(broker, monkeypatch):
stop_event = Event()
stop_event.set()
start_event = Event()
s = Sentinel(stop_event, start_event, broker=broker, start=False)
cluster_id = uuidlib.uuid4()
s = Sentinel(stop_event, start_event, cluster_id=cluster_id, broker=broker, start=False)
Stat(s).save()
# change the SECRET
monkeypatch.setattr(Conf, "SECRET_KEY", "OOPS")
stat = Stat.get_all()
assert len(stat) == 0
assert Stat.get(s.parent_pid) is None
assert Stat.get(pid=s.parent_pid, cluster_id=cluster_id) is None
task_queue = Queue()
pusher(task_queue, stop_event, broker=broker)
result_queue = Queue()
Expand Down
6 changes: 4 additions & 2 deletions django_q/tests/test_monitor.py
@@ -1,4 +1,5 @@
import pytest
import uuid

from django_q.tasks import async_task
from django_q.brokers import get_broker
Expand All @@ -10,15 +11,16 @@

@pytest.mark.django_db
def test_monitor(monkeypatch):
assert Stat.get(0).sentinel == 0
cluster_id = uuid.uuid4()
assert Stat.get(pid=0, cluster_id=cluster_id).sentinel == 0
c = Cluster()
c.start()
stats = monitor(run_once=True)
c.stop()
assert len(stats) > 0
found_c = False
for stat in stats:
if stat.cluster_id == c.pid:
if stat.cluster_id == c.cluster_id:
found_c = True
assert stat.uptime() > 0
assert stat.empty_queues() is True
Expand Down