Skip to content

Commit

Permalink
Fix lbbroker[23].py for python2 and python3
Browse files Browse the repository at this point in the history
Python examples lbbroker2.py and lbbroker3.py failed to run
on both python2 and python3.

So i made minimal changes to make it possible.

I also fixed few PEP8 problems my editor complained about, mainly
long lines and spaces on wrong places.
  • Loading branch information
David Pravec committed Apr 14, 2015
1 parent e0ff469 commit 64332f3
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 32 deletions.
49 changes: 26 additions & 23 deletions examples/Python/lbbroker2.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
"""

from __future__ import print_function
import threading
import time
import zmq

NBR_CLIENTS = 10
NBR_WORKERS = 3


def worker_thread(worker_url, context, i):
""" Worker using REQ socket to do LRU routing """

Expand All @@ -24,16 +26,17 @@ def worker_thread(worker_url, context, i):

socket.connect(worker_url)

# Tell the borker we are ready for work
# Tell the broker we are ready for work
socket.send(b"READY")

try:
while True:

address, empty, request = socket.recv_multipart()

print("%s: %s\n" % (socket.identity.decode('ascii'), request.decode('ascii')), end='')

print("%s: %s\n" % (socket.identity.decode('ascii'),
request.decode('ascii')), end='')

socket.send_multipart([address, b'', b'OK'])

except zmq.ContextTerminated:
Expand All @@ -45,7 +48,7 @@ def client_thread(client_url, context, i):
""" Basic request-reply client using REQ socket """

socket = context.socket(zmq.REQ)

# Set client identity. Makes tracing easier
socket.identity = (u"Client-%d" % (i)).encode('ascii')

Expand All @@ -55,7 +58,8 @@ def client_thread(client_url, context, i):
socket.send(b"HELLO")
reply = socket.recv()

print("%s: %s\n" % (socket.identity.decode('ascii'), reply.decode('ascii')), end='')
print("%s: %s\n" % (socket.identity.decode('ascii'),
reply.decode('ascii')), end='')


def main():
Expand All @@ -72,15 +76,15 @@ def main():
backend = context.socket(zmq.ROUTER)
backend.bind(url_worker)



# create workers and clients threads
for i in range(NBR_WORKERS):
thread = threading.Thread(target=worker_thread, args=(url_worker, context, i, ))
thread = threading.Thread(target=worker_thread,
args=(url_worker, context, i, ))
thread.start()

for i in range(NBR_CLIENTS):
thread_c = threading.Thread(target=client_thread, args=(url_client, context, i, ))
thread_c = threading.Thread(target=client_thread,
args=(url_client, context, i, ))
thread_c.start()

# Logic of LRU loop
Expand All @@ -91,7 +95,7 @@ def main():

# Queue of available workers
available_workers = 0
workers_list = []
workers_list = []

# init poller
poller = zmq.Poller()
Expand All @@ -111,7 +115,6 @@ def main():

# Queue worker address for LRU routing
message = backend.recv_multipart()

assert available_workers < NBR_WORKERS

worker_addr = message[0]
Expand All @@ -121,22 +124,22 @@ def main():
workers_list.append(worker_addr)

# Second frame is empty
empty = message[1]
assert empty == ""
empty = message[1]
assert empty == b""

# Third frame is READY or else a client reply address
client_addr = message[2]

# If client reply, send rest back to frontend
if client_addr != "READY":
if client_addr != b'READY':

# Following frame is empty
empty = message[3]
assert empty == ""
assert empty == b""

reply = message[4]

frontend.send_multipart([client_addr, "", reply])
frontend.send_multipart([client_addr, b"", reply])

client_nbr -= 1

Expand All @@ -150,19 +153,19 @@ def main():
# Now get next client request, route to LRU worker
# Client request is [address][empty][request]

[client_addr, empty, request ] = frontend.recv_multipart()
[client_addr, empty, request] = frontend.recv_multipart()

assert empty == ""
assert empty == b""

# Dequeue and drop the next worker address
available_workers -= 1
available_workers += -1
worker_id = workers_list.pop()

backend.send_multipart([worker_id, "", client_addr, "", request])

backend.send_multipart([worker_id, b"",
client_addr, b"", request])

#out of infinite loop: do some housekeeping
time.sleep (1)
time.sleep(1)

frontend.close()
backend.close()
Expand Down
25 changes: 16 additions & 9 deletions examples/Python/lbbroker3.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
context and conceptually acts as a separate process.
Author: Min RK <benjaminrk(at)gmail(dot)com>
Adapted from lruqueue.py by Guillaume Aubert (gaubert) <guillaume(dot)aubert(at)gmail(dot)com>
Adapted from lruqueue.py by
Guillaume Aubert (gaubert) <guillaume(dot)aubert(at)gmail(dot)com>
"""

from __future__ import print_function
import threading
import time
import zmq
Expand All @@ -22,6 +24,7 @@
NBR_CLIENTS = 10
NBR_WORKERS = 3


def worker_thread(worker_url, i):
""" Worker using REQ socket to do LRU routing """
context = zmq.Context.instance()
Expand All @@ -33,16 +36,17 @@ def worker_thread(worker_url, i):

socket.connect(worker_url)

# Tell the borker we are ready for work
# Tell the broker we are ready for work
socket.send(b"READY")

try:
while True:

address, empty, request = socket.recv_multipart()

print("%s: %s\n" % (socket.identity.decode('ascii'), request.decode('ascii')), end='')

print("%s: %s\n" % (socket.identity.decode('ascii'),
request.decode('ascii')), end='')

socket.send_multipart([address, b'', b'OK'])

except zmq.ContextTerminated:
Expand All @@ -55,7 +59,7 @@ def client_thread(client_url, i):
context = zmq.Context.instance()

socket = context.socket(zmq.REQ)

# Set client identity. Makes tracing easier
socket.identity = (u"Client-%d" % (i)).encode('ascii')

Expand All @@ -65,7 +69,8 @@ def client_thread(client_url, i):
socket.send(b"HELLO")
reply = socket.recv()

print("%s: %s\n" % (socket.identity.decode('ascii'), reply.decode('ascii')), end='')
print("%s: %s\n" % (socket.identity.decode('ascii'),
reply.decode('ascii')), end='')


class LRUQueue(object):
Expand Down Expand Up @@ -109,7 +114,7 @@ def handle_backend(self, msg):

if self.client_nbr == 0:
# Exit after N messages
self.loop.add_timeout(time.time()+1, self.loop.stop)
self.loop.add_timeout(time.time() + 1, self.loop.stop)

if self.available_workers == 1:
# on first recv, start accepting frontend messages
Expand All @@ -131,6 +136,7 @@ def handle_frontend(self, msg):
# stop receiving until workers become available again
self.frontend.stop_on_recv()


def main():
"""main method"""

Expand All @@ -151,7 +157,8 @@ def main():
thread.start()

for i in range(NBR_CLIENTS):
thread_c = threading.Thread(target=client_thread, args=(url_client, i, ))
thread_c = threading.Thread(target=client_thread,
args=(url_client, i, ))
thread_c.daemon = True
thread_c.start()

Expand Down

0 comments on commit 64332f3

Please sign in to comment.