Skip to content

Commit

Permalink
removed the multipart usage - we'll own single msg
Browse files Browse the repository at this point in the history
  • Loading branch information
tarekziade committed Jan 30, 2012
1 parent 5928d29 commit 410a552
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 12 deletions.
2 changes: 2 additions & 0 deletions examples/square_master.py
Expand Up @@ -27,6 +27,8 @@
runner.stop()
print 'bye'
except Exception, e:
import pdb; pdb.set_trace()
print str(e)
print 'Something went wrong (are we still running workers?)'
runner.stop()
print 'bye'
4 changes: 2 additions & 2 deletions powerhose/client/pinger.py
Expand Up @@ -48,8 +48,8 @@ def run(self):

with self.locker:
try:
self.socket.send_multipart(['PING', self.identity],
zmq.NOBLOCK)
self.socket.send(':::'.join(['PING', self.identity]),
zmq.NOBLOCK)
except zmq.ZMQError, e:
num_failed += 1
continue
Expand Down
10 changes: 5 additions & 5 deletions powerhose/client/worker.py
Expand Up @@ -3,7 +3,7 @@
import zmq
import threading

from powerhose.pinger import Pinger
from powerhose.client.pinger import Pinger


class RegisterError(Exception):
Expand Down Expand Up @@ -50,8 +50,8 @@ def _msg(self, req, rep):

# ping the master we are online, with an ID
try:
self.master.send_multipart([req, self.identity],
zmq.NOBLOCK)
self.master.send(':::'.join([req, self.identity]),
zmq.NOBLOCK)
except zmq.ZMQError:
raise RegisterError()

Expand Down Expand Up @@ -88,7 +88,7 @@ def run(self):
break

for socket in events:
msg = socket.recv_multipart()
msg = socket.recv().split(':::')
if msg == ['WAKE']:
# yeah I can work
socket.send('GIVE')
Expand All @@ -101,6 +101,6 @@ def run(self):
# XXX log the error
res = str(e)

socket.send_multipart(["JOBRES", msg[1], res])
socket.send(':::'.join(["JOBRES", msg[1], res]))

self._msg('REMOVE', 'REMOVED')
9 changes: 5 additions & 4 deletions powerhose/jobrunner.py
Expand Up @@ -65,12 +65,13 @@ def _execute(self, job_id, job_data, timeout=1.):
raise TimeoutError()

for socket in events:
msg = socket.recv_multipart()
msg = socket.recv().split(':::')
if msg == ['GIVE']:
# the worker is ready to get some job done
socket.send_multipart(["JOB", str(job_id),
job_data],
zmq.NOBLOCK)
socket.send(':::'.join(["JOB", str(job_id),
job_data]),
zmq.NOBLOCK)

elif msg[0] == 'JOBRES':
# we got a result
return msg[-1]
Expand Down
4 changes: 3 additions & 1 deletion powerhose/workermgr.py
Expand Up @@ -110,7 +110,9 @@ def run(self):
break

for socket in events:
msg = socket.recv_multipart()
msg = socket.recv().split(':::')
print msg

if msg[-2] == 'PING':
if msg[-1] not in self.workers:
name = msg[-1]
Expand Down

0 comments on commit 410a552

Please sign in to comment.