Skip to content

Commit

Permalink
ZQueueConsumer and ZPublisher bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Amit Upadhyay committed Jan 4, 2011
1 parent dd976e4 commit f6d62a9
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 4 deletions.
3 changes: 1 addition & 2 deletions dutils/zqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
DURATION = 5

def log(msg):
return
print "[%s]: %s" % (time.asctime(), msg)

# BDBPersistentQueue # {{{
Expand Down Expand Up @@ -314,7 +313,7 @@ def __init__(self, bind, namespace):
def process(self, item): pass

def run(self):
q = query_maker(self.bind)
q = query_maker(bind=self.bind)
while True:
msg = q("%s:get" % self.namespace)
if msg == "ZQueue.Shutdown": continue
Expand Down
5 changes: 3 additions & 2 deletions dutils/zutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ def __init__(self, bind):
super(ZPublisher, self).__init__()
self.daemon = True
self.bind = bind
self.q = Queue.Queue
self.q = Queue.Queue()
self.start()

def publish(self, msg): self.q.put(msg)
def shutdown(self): self.publish("ZPublisher.Shutdown")
def run(self):
self.socket = CONTEXT.socket(zmq.PUB)
self.socket.bind(self.bind)
print "ZPublisher listening on", self.bind
while True:
msg = self.q.get()
if msg == "ZPublisher.Shutdown":
Expand All @@ -59,7 +61,6 @@ def log(self, message):
def thread_init(self):
self.socket = CONTEXT.socket(zmq.XREP)
try:
print self.bind
self.socket.bind(self.bind)
except zmq.ZMQError, e:
print e, "while binding on", self.bind
Expand Down

0 comments on commit f6d62a9

Please sign in to comment.