Skip to content

Commit

Permalink
added ZPublisher, thread based
Browse files Browse the repository at this point in the history
  • Loading branch information
Amit Upadhyay committed Jan 4, 2011
1 parent 4accd19 commit 0b79048
Showing 1 changed file with 26 additions and 1 deletion.
27 changes: 26 additions & 1 deletion dutils/zutils.py
@@ -1,8 +1,9 @@
import zmq, threading, time
import zmq, threading, time, Queue

CONTEXT = zmq.Context()
ZNull = zmq.Message(None)

# multi helpers # {{{
def recv_multi(sock):
parts = []
while True:
Expand All @@ -16,9 +17,32 @@ def send_multi(sock, parts, reply=None):
for part in parts[:-1]:
sock.send(part, zmq.SNDMORE)
sock.send(parts[-1], 0)
# }}}

class NoReply(Exception): pass

# ZPublisher # {{{
class ZPublisher(threading.Thread):
def __init__(self, bind):
super(ZPublisher, self).__init__()
self.daemon = True
self.bind = bind
self.q = Queue.Queue

def publish(self, msg): self.q.put(msg)
def shutdown(self): self.publish("ZPublisher.Shutdown")
def run(self):
self.socket = CONTEXT.socket(zmq.PUB)
self.socket.bind(self.bind)
while True:
msg = self.q.get()
if msg == "ZPublisher.Shutdown":
self.socket.close()
break
self.socket.send(msg)
# }}}

# ZReplier # {{{
class ZReplier(threading.Thread):

def __init__(self, bind):
Expand Down Expand Up @@ -120,6 +144,7 @@ def loop(self):
self.shutdown()
self.join()
print "Terminated."
# }}}

def query_maker(socket=None, bind=None):
if not socket:
Expand Down

0 comments on commit 0b79048

Please sign in to comment.