Skip to content

Commit

Permalink
Python 3 compat for Chapter 2
Browse files Browse the repository at this point in the history
  • Loading branch information
minrk committed Jan 7, 2014
1 parent df05fc7 commit 651ae6b
Show file tree
Hide file tree
Showing 15 changed files with 69 additions and 87 deletions.
25 changes: 10 additions & 15 deletions examples/Python/interrupt.py
@@ -1,14 +1,14 @@
#
# Shows how to handle Ctrl-C
#
import zmq
import signal
import time
import zmq

interrupted = False

def signal_handler(signum, frame):
global interrupted
interrupted = True
print("W: custom interrupt handler called.")

context = zmq.Context()
socket = context.socket(zmq.REP)
Expand All @@ -18,17 +18,12 @@ def signal_handler(signum, frame):
try:
socket.recv()
except KeyboardInterrupt:
print "W: interrupt received, proceeding..."
print("W: interrupt received, proceeding...")

# or you can use a custom handler
counter = 0
# or you can use a custom handler,
# in which case recv will fail with EINTR
signal.signal(signal.SIGINT, signal_handler)
while True:
try:
message = socket.recv(zmq.DONTWAIT)
except zmq.ZMQError:
pass
counter += 1
if interrupted:
print "W: interrupt received, killing server..."
break
try:
message = socket.recv()
except zmq.ZMQError as e:
print("W: recv failed with: %s" % e)
2 changes: 1 addition & 1 deletion examples/Python/msgqueue.py
Expand Up @@ -13,7 +13,7 @@
def main():
""" main method """

context = zmq.Context(1)
context = zmq.Context()

# Socket facing clients
frontend = context.socket(zmq.ROUTER)
Expand Down
11 changes: 7 additions & 4 deletions examples/Python/mspoller.py
Expand Up @@ -18,7 +18,7 @@
# Connect to weather server
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt(zmq.SUBSCRIBE, "10001")
subscriber.setsockopt(zmq.SUBSCRIBE, b"10001")

# Initialize poll set
poller = zmq.Poller()
Expand All @@ -27,12 +27,15 @@

# Process messages from both sockets
while True:
socks = dict(poller.poll())
try:
socks = dict(poller.poll())
except KeyboardInterrupt:
break

if receiver in socks and socks[receiver] == zmq.POLLIN:
if receiver in socks:
message = receiver.recv()
# process task

if subscriber in socks and socks[subscriber] == zmq.POLLIN:
if subscriber in socks:
message = subscriber.recv()
# process weather update
10 changes: 5 additions & 5 deletions examples/Python/msreader.py
Expand Up @@ -19,7 +19,7 @@
# Connect to weather server
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt(zmq.SUBSCRIBE, "10001")
subscriber.setsockopt(zmq.SUBSCRIBE, b"10001")

# Process messages from both sockets
# We prioritize traffic from the task ventilator
Expand All @@ -28,16 +28,16 @@
# Process any waiting tasks
while True:
try:
rc = receiver.recv(zmq.DONTWAIT)
except zmq.ZMQError:
msg = receiver.recv(zmq.DONTWAIT)
except zmq.Again:
break
# process task

# Process any waiting weather updates
while True:
try:
rc = subscriber.recv(zmq.DONTWAIT)
except zmq.ZMQError:
msg = subscriber.recv(zmq.DONTWAIT)
except zmq.Again:
break
# process weather update

Expand Down
30 changes: 13 additions & 17 deletions examples/Python/mtrelay.py
Expand Up @@ -9,59 +9,55 @@
import threading
import zmq

def step1(context):
""" step1 """

def step1(context=None):
"""Step 1"""
context = context or zmq.Context.instance()
# Signal downstream to step 2
sender = context.socket(zmq.PAIR)
sender.connect("inproc://step2")

sender.send("")


sender.send(b"")

def step2(context):
""" step2 """

def step2(context=None):
"""Step 2"""
context = context or zmq.Context.instance()
# Bind to inproc: endpoint, then start upstream thread
receiver = context.socket(zmq.PAIR)
receiver.bind("inproc://step2")

thread = threading.Thread(target=step1, args=(context, ))
thread = threading.Thread(target=step1)
thread.start()

# Wait for signal
string = receiver.recv()
msg = receiver.recv()

# Signal downstream to step 3
sender = context.socket(zmq.PAIR)
sender.connect("inproc://step3")
sender.send("")
sender.send(b"")

return

def main():
""" server routine """
# Prepare our context and sockets
context = zmq.Context(1)
context = zmq.Context.instance()

# Bind to inproc: endpoint, then start upstream thread
receiver = context.socket(zmq.PAIR)
receiver.bind("inproc://step3")

thread = threading.Thread(target=step2, args=(context, ))
thread = threading.Thread(target=step2)
thread.start()

# Wait for signal
string = receiver.recv()

print("Test successful!\n")
print("Test successful!")

receiver.close()
context.term()

return


if __name__ == "__main__":
main()
16 changes: 8 additions & 8 deletions examples/Python/mtserver.py
Expand Up @@ -9,9 +9,9 @@
import threading
import zmq

def worker_routine(worker_url, context):
""" Worker routine """

def worker_routine(worker_url, context=None):
"""Worker routine"""
context = context or zmq.Context.instance()
# Socket to talk to dispatcher
socket = context.socket(zmq.REP)

Expand All @@ -21,22 +21,22 @@ def worker_routine(worker_url, context):

string = socket.recv()

print("Received request: [%s]\n" % (string))
print("Received request: [ %s ]" % (string))

# do some 'work'
time.sleep(1)

#send reply back to client
socket.send("World")
socket.send(b"World")

def main():
""" server routine """
"""Server routine"""

url_worker = "inproc://workers"
url_client = "tcp://*:5555"

# Prepare our context and sockets
context = zmq.Context(1)
context = zmq.Context.instance()

# Socket to talk to clients
clients = context.socket(zmq.ROUTER)
Expand All @@ -48,7 +48,7 @@ def main():

# Launch pool of worker threads
for i in range(5):
thread = threading.Thread(target=worker_routine, args=(url_worker, context, ))
thread = threading.Thread(target=worker_routine, args=(url_worker,))
thread.start()

zmq.device(zmq.QUEUE, clients, workers)
Expand Down
8 changes: 4 additions & 4 deletions examples/Python/psenvpub.py
Expand Up @@ -9,17 +9,17 @@
import zmq

def main():
""" main method """
"""main method"""

# Prepare our context and publisher
context = zmq.Context(1)
context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5563")

while True:
# Write two messages, each with an envelope and content
publisher.send_multipart(["A", "We don't want to see this"])
publisher.send_multipart(["B", "We would like to see this"])
publisher.send_multipart([b"A", b"We don't want to see this"])
publisher.send_multipart([b"B", b"We would like to see this"])
time.sleep(1)

# We never get here but clean up anyhow
Expand Down
6 changes: 3 additions & 3 deletions examples/Python/psenvsub.py
Expand Up @@ -11,15 +11,15 @@ def main():
""" main method """

# Prepare our context and publisher
context = zmq.Context(1)
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5563")
subscriber.setsockopt(zmq.SUBSCRIBE, "B")
subscriber.setsockopt(zmq.SUBSCRIBE, b"B")

while True:
# Read envelope with address
[address, contents] = subscriber.recv_multipart()
print("[%s] %s\n" % (address, contents))
print("[%s] %s" % (address, contents))

# We never get here but clean up anyhow
subscriber.close()
Expand Down
16 changes: 4 additions & 12 deletions examples/Python/rrbroker.py
Expand Up @@ -21,17 +21,9 @@
socks = dict(poller.poll())

if socks.get(frontend) == zmq.POLLIN:
message = frontend.recv()
more = frontend.getsockopt(zmq.RCVMORE)
if more:
backend.send(message, zmq.SNDMORE)
else:
backend.send(message)
message = frontend.recv_multipart()
backend.send_multipart(message)

if socks.get(backend) == zmq.POLLIN:
message = backend.recv()
more = backend.getsockopt(zmq.RCVMORE)
if more:
frontend.send(message, zmq.SNDMORE)
else:
frontend.send(message)
message = backend.recv_multipart()
frontend.send_multipart(message)
4 changes: 2 additions & 2 deletions examples/Python/rrclient.py
Expand Up @@ -12,6 +12,6 @@

# Do 10 requests, waiting each time for a response
for request in range(1,11):
socket.send("Hello")
socket.send(b"Hello")
message = socket.recv()
print "Received reply ", request, "[", message, "]"
print("Received reply %s [%s]" % (request, message))
4 changes: 2 additions & 2 deletions examples/Python/rrworker.py
Expand Up @@ -11,5 +11,5 @@

while True:
message = socket.recv()
print "Received request: ", message
socket.send("World")
print("Received request: %s" % message)
socket.send(b"World")
2 changes: 1 addition & 1 deletion examples/Python/syncpub.py
Expand Up @@ -27,7 +27,7 @@ def main():
# send synchronization reply
syncservice.send(b'')
subscribers += 1
print ("+1 subscriber")
print("+1 subscriber (%i/%i)" % (subscribers, SUBSCRIBERS_EXPECTED))

# Now broadcast exactly 1M updates followed by END
for i in range(1000000):
Expand Down
6 changes: 3 additions & 3 deletions examples/Python/tasksink2.py
Expand Up @@ -27,7 +27,7 @@
tstart = time.time()

# Process 100 confirmiations
for task_nbr in xrange(100):
for task_nbr in range(100):
receiver.recv()
if task_nbr % 10 == 0:
sys.stdout.write(":")
Expand All @@ -39,10 +39,10 @@
tend = time.time()
tdiff = tend - tstart
total_msec = tdiff * 1000
print "Total elapsed time: %d msec" % total_msec
print("Total elapsed time: %d msec" % total_msec)

# Send kill signal to workers
controller.send("KILL")
controller.send(b"KILL")

# Finished
time.sleep(1) # Give 0MQ time to deliver
6 changes: 3 additions & 3 deletions examples/Python/taskwork2.py
Expand Up @@ -23,7 +23,7 @@
# Socket for control input
controller = context.socket(zmq.SUB)
controller.connect("tcp://localhost:5559")
controller.setsockopt(zmq.SUBSCRIBE, "")
controller.setsockopt(zmq.SUBSCRIBE, b"")

# Process messages from receiver and controller
poller = zmq.Poller()
Expand All @@ -34,7 +34,7 @@
socks = dict(poller.poll())

if socks.get(receiver) == zmq.POLLIN:
message = receiver.recv()
message = receiver.recv_string()

# Process task
workload = int(message) # Workload in msecs
Expand All @@ -43,7 +43,7 @@
time.sleep(workload / 1000.0)

# Send results to sink
sender.send(message)
sender.send_string(message)

# Simple progress indicator for the viewer
sys.stdout.write(".")
Expand Down
10 changes: 3 additions & 7 deletions examples/Python/wuproxy.py
Expand Up @@ -15,14 +15,10 @@
backend.bind("tcp://10.1.1.0:8100")

# Subscribe on everything
frontend.setsockopt(zmq.SUBSCRIBE, '')
frontend.setsockopt(zmq.SUBSCRIBE, b'')

# Shunt messages out to our own subscribers
while True:
# Process all parts of the message
message = frontend.recv()
more = frontend.getsockopt(zmq.RCVMORE)
if more:
backend.send(message, zmq.SNDMORE)
else:
backend.send(message) # last message part
message = frontend.recv_multipart()
backend.send_multipart(message)

0 comments on commit 651ae6b

Please sign in to comment.