Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #122 from lcwill/worker-pids-fix

worker_pids() does not return correct set of pyres worker pids
  • Loading branch information...
commit d936b45142d15f733904ca9d2c5c636225d95eb1 2 parents 104df5b + e119d7e
@binarydud authored
Showing with 41 additions and 6 deletions.
  1. +1 −1  pyres/__init__.py
  2. +9 −5 pyres/worker.py
  3. +31 −0 tests/test_worker.py
View
2  pyres/__init__.py
@@ -260,7 +260,7 @@ def reserve(self, queues):
return Job.reserve(queues, self)
def __str__(self):
- return "PyRes Client connected to %s" % self.redis.server
+ return "PyRes Client connected to %s" % self.dsn
def workers(self):
from pyres.worker import Worker
View
14 pyres/worker.py
@@ -75,7 +75,7 @@ def unregister_worker(self):
def prune_dead_workers(self):
all_workers = Worker.all(self.resq)
- known_workers = self.worker_pids()
+ known_workers = Worker.worker_pids()
for worker in all_workers:
host, pid, queues = worker.id.split(':')
if host != self.hostname:
@@ -329,12 +329,16 @@ def state(self):
return 'working'
return 'idle'
- def worker_pids(self):
+ @classmethod
+ def worker_pids(cls):
"""Returns an array of all pids (as strings) of the workers on
this machine. Used when pruning dead workers."""
- return map(lambda l: l.strip().split(' ')[0],
- commands.getoutput("ps -A -o pid,command | \
- grep pyres_worker").split("\n"))
+ cmd = "ps -A -o pid,command | grep pyres_worker | grep -v grep"
+ output = commands.getoutput(cmd)
+ if output:
+ return map(lambda l: l.strip().split(' ')[0], output.split("\n"))
+ else:
+ return []
@classmethod
def run(cls, queues, server="localhost:6379", interval=None, timeout=None):
View
31 tests/test_worker.py
@@ -296,5 +296,36 @@ def test_retries_give_up_eventually(self):
assert None == worker.process()
assert worker.get_failed() == 1
+ def test_worker_pids(self):
+ # spawn worker processes and get pids
+ pids = []
+ pids.append(self.spawn_worker(['basic']))
+ pids.append(self.spawn_worker(['basic']))
+ time.sleep(1)
+ worker_pids = Worker.worker_pids()
+
+ # send kill signal to workers and wait for them to exit
+ import signal
+ for pid in pids:
+ os.kill(pid, signal.SIGQUIT)
+ os.waitpid(pid, 0)
+
+ # ensure worker_pids() returned the correct pids
+ for pid in pids:
+ assert str(pid) in worker_pids
+
+ # ensure the workers are no longer returned by worker_pids()
+ worker_pids = Worker.worker_pids()
+ for pid in pids:
+ assert str(pid) not in worker_pids
+
+ def spawn_worker(self, queues):
+ pid = os.fork()
+ if not pid:
+ Worker.run(queues, interval=1)
+ os._exit(0)
+ else:
+ return pid
+
def set_current_time(self, time):
ResQ._current_time = staticmethod(lambda: time)
Please sign in to comment.
Something went wrong with that request. Please try again.