Skip to content

Commit

Permalink
Merge pull request #414 from crea-asia/cluster_id
Browse files Browse the repository at this point in the history
Differentiate between PID and unique cluster ID
  • Loading branch information
Koed00 committed Feb 14, 2020
2 parents 4faf577 + 93fab1e commit e3a4699
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 21 deletions.
17 changes: 10 additions & 7 deletions django_q/cluster.py
Expand Up @@ -14,6 +14,7 @@
import signal
import socket
import traceback
import uuid
# Django
from django import db
from django.utils import timezone
Expand All @@ -38,6 +39,7 @@ def __init__(self, broker=None):
self.stop_event = None
self.start_event = None
self.pid = current_process().pid
self.cluster_id = uuid.uuid4()
self.host = socket.gethostname()
self.timeout = Conf.TIMEOUT
signal.signal(signal.SIGTERM, self.sig_handler)
Expand All @@ -48,20 +50,20 @@ def start(self):
self.stop_event = Event()
self.start_event = Event()
self.sentinel = Process(target=Sentinel,
args=(self.stop_event, self.start_event, self.broker, self.timeout))
args=(self.stop_event, self.start_event, self.cluster_id, self.broker, self.timeout))
self.sentinel.start()
logger.info(_('Q Cluster-{} starting.').format(self.pid))
logger.info(_('Q Cluster-{} starting.').format(self.cluster_id))
while not self.start_event.is_set():
sleep(0.1)
return self.pid

def stop(self):
if not self.sentinel.is_alive():
return False
logger.info(_('Q Cluster-{} stopping.').format(self.pid))
logger.info(_('Q Cluster-{} stopping.').format(self.cluster_id))
self.stop_event.set()
self.sentinel.join()
logger.info(_('Q Cluster-{} has stopped.').format(self.pid))
logger.info(_('Q Cluster-{} has stopped.').format(self.cluster_id))
self.start_event = None
self.stop_event = None
return True
Expand All @@ -74,8 +76,8 @@ def sig_handler(self, signum, frame):
@property
def stat(self):
if self.sentinel:
return Stat.get(self.pid)
return Status(self.pid)
return Stat.get(pid=self.pid, cluster_id=self.cluster_id)
return Status(pid=self.pid, cluster_id=self.cluster_id)

@property
def is_starting(self):
Expand All @@ -95,11 +97,12 @@ def has_stopped(self):


class Sentinel(object):
def __init__(self, stop_event, start_event, broker=None, timeout=Conf.TIMEOUT, start=True):
def __init__(self, stop_event, start_event, cluster_id, broker=None, timeout=Conf.TIMEOUT, start=True):
# Make sure we catch signals for the pool
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
self.pid = current_process().pid
self.cluster_id = cluster_id
self.parent_pid = get_ppid()
self.name = current_process().name
self.broker = broker or get_broker()
Expand Down
2 changes: 1 addition & 1 deletion django_q/monitor.py
Expand Up @@ -72,7 +72,7 @@ def monitor(run_once=False, broker=None):
uptime = '%d:%02d:%02d' % (hours, minutes, seconds)
# print to the terminal
print(term.move(i, 0) + term.center(stat.host[:col_width - 1], width=col_width - 1))
print(term.move(i, 1 * col_width) + term.center(stat.cluster_id, width=col_width - 1))
print(term.move(i, 1 * col_width) + term.center(str(stat.cluster_id)[-8:], width=col_width - 1))
print(term.move(i, 2 * col_width) + term.center(status, width=col_width - 1))
print(term.move(i, 3 * col_width) + term.center(workers, width=col_width - 1))
print(term.move(i, 4 * col_width) + term.center(tasks, width=col_width - 1))
Expand Down
11 changes: 6 additions & 5 deletions django_q/status.py
Expand Up @@ -8,11 +8,12 @@
class Status(object):
"""Cluster status base class."""

def __init__(self, pid):
def __init__(self, pid, cluster_id):
self.workers = []
self.tob = None
self.reincarnations = 0
self.cluster_id = pid
self.pid = pid
self.cluster_id = cluster_id
self.sentinel = 0
self.status = Conf.STOPPED
self.done_q_size = 0
Expand All @@ -27,7 +28,7 @@ class Stat(Status):
"""Status object for Cluster monitoring."""

def __init__(self, sentinel):
super(Stat, self).__init__(sentinel.parent_pid or sentinel.pid)
super(Stat, self).__init__(sentinel.parent_pid or sentinel.pid, cluster_id=sentinel.cluster_id)
self.broker = sentinel.broker or get_broker()
self.tob = sentinel.tob
self.reincarnations = sentinel.reincarnations
Expand Down Expand Up @@ -72,7 +73,7 @@ def empty_queues(self):
return self.done_q_size + self.task_q_size == 0

@staticmethod
def get(cluster_id, broker=None):
def get(pid, cluster_id, broker=None):
"""
gets the current status for the cluster
:param cluster_id: id of the cluster
Expand All @@ -86,7 +87,7 @@ def get(cluster_id, broker=None):
return SignedPackage.loads(pack)
except BadSignature:
return None
return Status(cluster_id)
return Status(pid=pid, cluster_id=cluster_id)

@staticmethod
def get_all(broker=None):
Expand Down
18 changes: 12 additions & 6 deletions django_q/tests/test_cluster.py
Expand Up @@ -3,6 +3,7 @@
from multiprocessing import Event, Value
from time import sleep
from django.utils import timezone
import uuid as uuidlib

import os
import pytest
Expand Down Expand Up @@ -72,7 +73,8 @@ def test_sentinel():
start_event = Event()
stop_event = Event()
stop_event.set()
s = Sentinel(stop_event, start_event, broker=get_broker('sentinel_test:q'))
cluster_id = uuidlib.uuid4()
s = Sentinel(stop_event, start_event, cluster_id=cluster_id, broker=get_broker('sentinel_test:q'))
assert start_event.is_set()
assert s.status() == Conf.STOPPED

Expand Down Expand Up @@ -256,9 +258,10 @@ def test_timeout(broker, cluster_config_timeout, async_task_kwargs):
async_task('time.sleep', 5, broker=broker, **async_task_kwargs)
start_event = Event()
stop_event = Event()
cluster_id = uuidlib.uuid4()
# Set a timer to stop the Sentinel
threading.Timer(3, stop_event.set).start()
s = Sentinel(stop_event, start_event, broker=broker, timeout=cluster_config_timeout)
s = Sentinel(stop_event, start_event, cluster_id=cluster_id, broker=broker, timeout=cluster_config_timeout)
assert start_event.is_set()
assert s.status() == Conf.STOPPED
assert s.reincarnations == 1
Expand All @@ -279,9 +282,10 @@ def test_timeout_task_finishes(broker, cluster_config_timeout, async_task_kwargs
async_task('time.sleep', 3, broker=broker, **async_task_kwargs)
start_event = Event()
stop_event = Event()
cluster_id = uuidlib.uuid4()
# Set a timer to stop the Sentinel
threading.Timer(6, stop_event.set).start()
s = Sentinel(stop_event, start_event, broker=broker, timeout=cluster_config_timeout)
s = Sentinel(stop_event, start_event, cluster_id=cluster_id, broker=broker, timeout=cluster_config_timeout)
assert start_event.is_set()
assert s.status() == Conf.STOPPED
assert s.reincarnations == 0
Expand All @@ -297,12 +301,13 @@ def test_recycle(broker, monkeypatch):
async_task('django_q.tests.tasks.multiply', 2, 2, broker=broker)
start_event = Event()
stop_event = Event()
cluster_id = uuidlib.uuid4()
# override settings
monkeypatch.setattr(Conf, 'RECYCLE', 2)
monkeypatch.setattr(Conf, 'WORKERS', 1)
# set a timer to stop the Sentinel
threading.Timer(3, stop_event.set).start()
s = Sentinel(stop_event, start_event, broker=broker)
s = Sentinel(stop_event, start_event, cluster_id=cluster_id, broker=broker)
assert start_event.is_set()
assert s.status() == Conf.STOPPED
assert s.reincarnations == 1
Expand Down Expand Up @@ -333,13 +338,14 @@ def test_bad_secret(broker, monkeypatch):
stop_event = Event()
stop_event.set()
start_event = Event()
s = Sentinel(stop_event, start_event, broker=broker, start=False)
cluster_id = uuidlib.uuid4()
s = Sentinel(stop_event, start_event, cluster_id=cluster_id, broker=broker, start=False)
Stat(s).save()
# change the SECRET
monkeypatch.setattr(Conf, "SECRET_KEY", "OOPS")
stat = Stat.get_all()
assert len(stat) == 0
assert Stat.get(s.parent_pid) is None
assert Stat.get(pid=s.parent_pid, cluster_id=cluster_id) is None
task_queue = Queue()
pusher(task_queue, stop_event, broker=broker)
result_queue = Queue()
Expand Down
6 changes: 4 additions & 2 deletions django_q/tests/test_monitor.py
@@ -1,4 +1,5 @@
import pytest
import uuid

from django_q.tasks import async_task
from django_q.brokers import get_broker
Expand All @@ -10,15 +11,16 @@

@pytest.mark.django_db
def test_monitor(monkeypatch):
assert Stat.get(0).sentinel == 0
cluster_id = uuid.uuid4()
assert Stat.get(pid=0, cluster_id=cluster_id).sentinel == 0
c = Cluster()
c.start()
stats = monitor(run_once=True)
c.stop()
assert len(stats) > 0
found_c = False
for stat in stats:
if stat.cluster_id == c.pid:
if stat.cluster_id == c.cluster_id:
found_c = True
assert stat.uptime() > 0
assert stat.empty_queues() is True
Expand Down

0 comments on commit e3a4699

Please sign in to comment.