From 410a55295a35d63469f5a6296c85c5c353198096 Mon Sep 17 00:00:00 2001 From: Tarek Ziade Date: Mon, 30 Jan 2012 14:20:47 -0800 Subject: [PATCH] removed the multipart usage - we'll own single msg --- examples/square_master.py | 2 ++ powerhose/client/pinger.py | 4 ++-- powerhose/client/worker.py | 10 +++++----- powerhose/jobrunner.py | 9 +++++---- powerhose/workermgr.py | 4 +++- 5 files changed, 17 insertions(+), 12 deletions(-) diff --git a/examples/square_master.py b/examples/square_master.py index e9fe7ae..376efb2 100644 --- a/examples/square_master.py +++ b/examples/square_master.py @@ -27,6 +27,8 @@ runner.stop() print 'bye' except Exception, e: + import pdb; pdb.set_trace() + print str(e) print 'Something went wrong (are we still running workers?)' runner.stop() print 'bye' diff --git a/powerhose/client/pinger.py b/powerhose/client/pinger.py index 9f84eb3..d714fa3 100644 --- a/powerhose/client/pinger.py +++ b/powerhose/client/pinger.py @@ -48,8 +48,8 @@ def run(self): with self.locker: try: - self.socket.send_multipart(['PING', self.identity], - zmq.NOBLOCK) + self.socket.send(':::'.join(['PING', self.identity]), + zmq.NOBLOCK) except zmq.ZMQError, e: num_failed += 1 continue diff --git a/powerhose/client/worker.py b/powerhose/client/worker.py index 2d24b62..52f689c 100644 --- a/powerhose/client/worker.py +++ b/powerhose/client/worker.py @@ -3,7 +3,7 @@ import zmq import threading -from powerhose.pinger import Pinger +from powerhose.client.pinger import Pinger class RegisterError(Exception): @@ -50,8 +50,8 @@ def _msg(self, req, rep): # ping the master we are online, with an ID try: - self.master.send_multipart([req, self.identity], - zmq.NOBLOCK) + self.master.send(':::'.join([req, self.identity]), + zmq.NOBLOCK) except zmq.ZMQError: raise RegisterError() @@ -88,7 +88,7 @@ def run(self): break for socket in events: - msg = socket.recv_multipart() + msg = socket.recv().split(':::') if msg == ['WAKE']: # yeah I can work socket.send('GIVE') @@ -101,6 +101,6 @@ def run(self): # XXX log the error res = str(e) - socket.send_multipart(["JOBRES", msg[1], res]) + socket.send(':::'.join(["JOBRES", msg[1], res])) self._msg('REMOVE', 'REMOVED') diff --git a/powerhose/jobrunner.py b/powerhose/jobrunner.py index 92333fb..ee70992 100644 --- a/powerhose/jobrunner.py +++ b/powerhose/jobrunner.py @@ -65,12 +65,13 @@ def _execute(self, job_id, job_data, timeout=1.): raise TimeoutError() for socket in events: - msg = socket.recv_multipart() + msg = socket.recv().split(':::') if msg == ['GIVE']: # the worker is ready to get some job done - socket.send_multipart(["JOB", str(job_id), - job_data], - zmq.NOBLOCK) + socket.send(':::'.join(["JOB", str(job_id), + job_data]), + zmq.NOBLOCK) + elif msg[0] == 'JOBRES': # we got a result return msg[-1] diff --git a/powerhose/workermgr.py b/powerhose/workermgr.py index 29f4ade..6d7184b 100644 --- a/powerhose/workermgr.py +++ b/powerhose/workermgr.py @@ -110,7 +110,9 @@ def run(self): break for socket in events: - msg = socket.recv_multipart() + msg = socket.recv().split(':::') + print msg + if msg[-2] == 'PING': if msg[-1] not in self.workers: name = msg[-1]